15
/* Sequential processes can use the library compiled with -D_NOMPI */
16
# include "mpidummy.h"
19
/* Parallel applications should use MPI to communicate file info and slices of data */
25
#include "adios_read.h"
26
#include "adios_read_hooks.h"
27
#include "adios_error.h"
30
#include <sys/queue.h>
32
#include <thin_portal.h>
33
#elif HAVE_INFINIBAND == 1
37
#include <sys/socket.h>
38
#include <sys/times.h>
39
#include <netinet/in.h>
42
#include <sys/types.h>
47
#include <gen_thread.h>
49
//#include "get_clock.h"
50
//#include "attributes.h"
51
//#include "memwatch.h"
57
#define DT_MAX_QUEUE_LENGTH 512
59
typedef struct _Queue_Item
63
FMStructDescList var_list;
64
int32_t rank; // rank of client from which the chunk comes
67
typedef struct _datatap_var_chunk
71
uint64_t *local_bounds; // ndims
72
uint64_t *global_bounds; // ndims
73
uint64_t *global_offsets; // ndims
74
struct _datata_var_chunk *next;
75
} datatap_var_chunk, *datatap_var_chunk_p;
77
typedef struct _datatap_var_info
82
enum ADIOS_DATATYPES type;
84
int time_dim; // -1 means no time dimension
87
datatap_var_chunk *chunks;
88
struct _datatap_var_info *next;
89
} datatap_var_info, *datatap_var_info_p;
92
//typedef struct _datatap_pg_info
96
// datatap_var_info *vars;
97
//} datatap_pg_info, *datatap_pg_info_p;
99
#define VAR_BITMAP_SIZE 16
102
typedef struct _datatap_read_file_data
105
char *group_name; // TODO: assume one group in file
107
// int timestep; // TODO: it's already in file_info
109
datatap_var_info *vars;
110
// FMStructDescList var_list; // TODO
112
// TODO: replicated meta-data info from peer readers
113
// it is a list of vars which is not present locally
115
datatap_var_info *vars_peer;
121
int host_language_fortran; // 1 for Fortran; 0 for C
123
char var_bitmap[VAR_BITMAP_SIZE]; // 128 bit for 128 var
125
// int num_vars_read;
126
// datatap_var_info *vars_read;
127
// datatap_var_info *vars_read_tail;
128
} datatap_read_file_data, *datatap_read_file_data_p;
130
typedef struct _datatap_read_method_data
132
pthread_t dt_server_thread;
134
// uint32_t dt_queue_max_length;
135
// pthread_mutex_t mutex;
136
// pthread_cond_t cond1;
137
// pthread_cond_t cond2;
144
// int dt_server_stop;
145
int num_io_dumps; // TODO: timestep
146
} datatap_read_method_data, *datatap_read_method_data_p;
149
// this sructure holds all global data for Datatap Read method
150
datatap_read_method_data *dt_read_data = NULL;
152
// compare used-providd varname with the full path name of variable v
153
// return zero if matches and non-zero otherwise
154
static int compare_var_name (char *varname, datatap_var_info *v)
156
if (varname[0] == '/') { // varname is full path
158
if(!strcmp(v->varpath, "/")) {
159
sprintf(fullpath, "/%s\0", v->varname);
162
sprintf(fullpath, "%s/%s\0", v->varpath, v->varname);
164
return strcmp(fullpath, varname);
166
else { // varname doesn't include path
167
return strcmp(v->varname, varname);
171
static FMField *find_field (char *name, char *path, FMFieldList flist)
174
char *full_path_name = get_full_path_name(name, path);
175
temp_name = getFixedName(full_path_name);
176
free(full_path_name);
178
while (f->field_name != NULL) {
179
if(!strcmp(temp_name, f->field_name)) {
187
char *name_pos = f->field_name + (strlen(f->field_name) - strlen(name));
188
if(!strncmp(path, f->field_name, strlen(path)) &&
189
!strcmp(name, name_pos)) {
201
int64_t read_array(datatap_read_file_data *ds, datatap_var_info *var,
202
uint64_t *start, uint64_t *count, void *data)
204
int type_size = common_read_type_size(var->type, NULL);
205
fprintf(stderr, "im here type_size %d %s:%d\n", type_size, __FILE__,__LINE__);
206
int64_t total_size = 0;
208
// go over the whole list of data chunks and reorganize arrays
209
Queue *data_q = ds->f_info->dt_queue;
211
// TODO: we should be able to access the queue since the dt server will not
213
ListElmt *current_chunk = list_head(data_q);
214
while(current_chunk) {
215
QueueItem *qi = (QueueItem *)current_chunk->data;
216
FMFieldList filed_list = qi->var_list->field_list; // TODO
218
// first, find the var
219
FMField *f = find_field(var->varname, var->varpath, filed_list);
222
// actually this will not happen because the filed_list will
223
// contain all vars even though some may not be written
224
current_chunk = current_chunk->next;
228
char *source_addr = (char *)qi->data + f->field_offset; // this is the address
231
// here we need to distinguish static and dynamic arrays
232
int dim_are_nums = 1;
235
char *dim_start = strchr(f->field_type, '[');
236
char *dim_end = strrchr(f->field_type, ']');
237
while(dim_start < dim_end) {
238
if(*dim_start == '[' || *dim_start == ']') {
241
} else if(!isdigit(*dim_start)) {
253
source_addr2 = source_addr;
257
case adios_unsigned_byte:
258
source_addr2 = *((char **)source_addr);
261
case adios_string: // TODO
262
return strlen((char *)source_addr) + 1;
263
//source_addr2 = *((char **)source_addr);
267
case adios_unsigned_short:
268
source_addr2 = *((short int **)source_addr);
272
case adios_unsigned_integer:
273
source_addr2 = *((int **)source_addr);
277
case adios_unsigned_long:
278
source_addr2 = *((long int **)source_addr);
282
source_addr2 = *((float **)source_addr);
286
source_addr2 = *((double **)source_addr);
289
case adios_long_double:
290
source_addr2 = *((long double **)source_addr);
294
case adios_double_complex:
296
adios_error(err_invalid_read_method, "complex data type is not supported.");
301
// find the array dimension info
303
datatap_var_chunk *chunk = var->chunks;
305
if(chunk->rank == qi->rank) {
313
if(!chunk) { // this chunk doesn't contain the var so skip it
314
current_chunk = current_chunk->next;
318
// now we copy data into user's read buffer
319
uint64_t global_start_r = 0;
320
for(j = var->ndims-1; j >= 0; j --) {
321
global_start_r *= chunk->global_bounds[j];
322
global_start_r += start[j];
326
total_size = type_size;
327
for(j = var->ndims-1; j >= 0; j --) {
328
total_size *= count[j];
331
for(j = 0; j < var->ndims; j ++) {
332
int lower = chunk->global_offsets[j] - start[j];
333
int higher = (start[j]+count[j]) - (chunk->global_offsets[j]+chunk->local_bounds[j]);
334
if(lower < 0 || higher < 0) {
335
// this means this chunk has some data which is not needed by this reader
336
// TODO: for pixie3d case, this will not happen
337
adios_error(err_invalid_read_method, "Datatap cannot support this decomposition.\n");
342
// find the smallest slice
344
uint64_t stride_size = type_size;
345
for(j = 0; j < var->ndims; j ++) {
346
int lower = chunk->global_offsets[j] - start[j];
347
int higher = (start[j]+count[j]) - (chunk->global_offsets[j]+chunk->local_bounds[j]);
348
if(lower == 0 && higher == 0) {
349
// this means this dimnesion fits, let's move on to the next dimension
350
stride_size *= chunk->local_bounds[j];
354
else if((lower > 0 && higher >= 0) ||
355
(lower >= 0 && higher > 0)) {
356
// this means this dimension is strided. this dimension is the highest possible
362
uint64_t num_strides = 1;
363
for(j = min_dim; j < var->ndims; j ++) {
364
num_strides *= chunk->local_bounds[j];
367
uint64_t current_pos[var->ndims]; // track current stride's starting global offset
368
for(j = 0; j < var->ndims; j ++) {
369
current_pos[j] = chunk->global_offsets[j];
372
char *stride_start_addr = source_addr2;
374
for(; k < num_strides; k ++) {
375
// calculate the coordinates within the bounding box
376
uint64_t my_start = 0;
377
for(j = var->ndims-1; j >= 0; j --) {
378
my_start *= count[j];
379
my_start += (current_pos[j] - chunk->global_offsets[j]);
381
char *position = (char *)data + my_start * type_size;
383
memcpy(position, stride_start_addr, stride_size);
385
// now advance to copy next stride
386
stride_start_addr += stride_size;
388
for(j = min_dim; j < var->ndims; j ++) {
390
if(current_pos[j] == chunk->global_offsets[j]+chunk->local_bounds[j]-1) {
391
// don't set should_stop so we move on to next dimension
392
current_pos[j] = chunk->global_offsets[j];
406
for(j = var->ndims-1; j >= 0; j --) {
407
// for each sub-chunk, test if it falls into the reading region
408
int lower = (chunk->global_offsets[j] >= start[j]) ? 1: 0;
409
int higher = ((chunk->global_offsets[j]+chunk->local_bounds[j]) <=
410
(start[j]+count[j])) ? 1: 0;
411
if(lower && higher) {
412
if(j == 0) { // copy the whole chunks
413
uint64_t my_start = 0;
414
uint64_t my_size = type_size;
416
for(k = var->ndims-1; k >= 0; k --) {
417
my_start *= chunk->global_bounds[k];
418
my_start += chunk->global_offsets[k];
419
my_size *= chunk->local_bounds[k];
423
for(z = 0; z < ; z ++) {
424
uint64_t z_offset = pix_record->zoffset + k;
425
for(y = 0; y < pix_record->ysize; y ++) {
426
unsigned long long y_offset = pix_record->yoffset + j;
427
unsigned long long position = z_offset * pix_record->nyd_plus_2 * pix_record->nxd_plus_2 +
428
y_offset * pix_record->nxd_plus_2 + pix_record->xoffset;
429
position -= p_data->start_pos;
430
memcpy(&(p_data->buffers[m].buffer[position]), v, pix_record->xsize * sizeof(double));
435
char *position = (char *)data + (my_start - global_start_r) * type_size;
436
memcpy(position, source_addr2, my_size);
439
fprintf(stderr, "%d rank=%d name = %s position=%p data=%p mystart = %lu global_start_r = %lu mysize = %lu %s:%d\n", ds->my_rank,chunk->rank,var->varname,position,data,my_start, global_start_r, my_size, __FILE__,__LINE__);
440
total_size += my_size;
441
fprintf(stderr, "im here %d %s:%d\n", ds->my_rank,__FILE__,__LINE__);
445
fprintf(stderr, "im here %s:%d\n", __FILE__,__LINE__);
452
adios_error(err_invalid_read_method, "Datatap cannot support this decomposition.\n");
456
// we are done with this chunk, now extract data from next chunk
460
current_chunk = current_chunk->next;
462
fprintf(stderr, "im here read rank %d var %s total size %ld %s:%d\n", ds->my_rank, var->varname, total_size,__FILE__,__LINE__);
464
// TODO: the data size is messed up because of ghost zone
465
total_size = type_size;
467
for(j = var->ndims-1; j >= 0; j --) {
468
total_size *= count[j];
474
int reorganize_array (QueueItem *qi, datatap_read_file_data *ds)
476
fprintf(stderr, "im here rank %d %s:%d\n",dt_read_data->dt_comm_rank,__FILE__,__LINE__);
477
// go through all variables user wants to read
478
datatap_pg_info *pg = NULL;
480
for(i = 0; i < ds->num_pgs; i ++) {
481
if(ds->pgs[i].rank == qi->rank) {
487
adios_error(err_unspecified, "cannot find pg.\n");
491
fprintf(stderr, "im here rank %d %s:%d\n",dt_read_data->dt_comm_rank,__FILE__,__LINE__);
492
datatap_var_info *var = ds->vars_read;
493
FMFieldList filed_list = qi->var_list->field_list; // TODO
495
// first search within the pg for this var
496
fprintf(stderr, "im here rank %d var name %s %s:%d\n",dt_read_data->dt_comm_rank,var->varname,__FILE__,__LINE__);
497
datatap_var_info *v = NULL;
498
for(i = 0; i < pg->num_vars; i ++) {
499
fprintf(stderr, "im here rank %d var name %s i %d %s:%d\n",dt_read_data->dt_comm_rank,var->varname,i,__FILE__,__LINE__);
500
//if(!strcmp(pg->vars[i].varname, var->varname)) {
501
if(!compare_var_name(var->varname, &(pg->vars[i]))) {
502
fprintf(stderr, "im here rank %d var name %s i %d %s:%d\n",dt_read_data->dt_comm_rank,var->varname,i,__FILE__,__LINE__);
505
fprintf(stderr, "im here rank %d var name %s i %d %s:%d\n",dt_read_data->dt_comm_rank,var->varname,i,__FILE__,__LINE__);
507
int type_size = common_read_type_size(v->type, NULL);
508
fprintf(stderr, "im here rank %d var name %s i %d %s:%d\n",dt_read_data->dt_comm_rank,var->varname,i,__FILE__,__LINE__);
509
FMField *f = find_field(v->varname, v->varpath, filed_list);
510
fprintf(stderr, "im here rank %d var name %s i %d %s:%d\n",dt_read_data->dt_comm_rank,var->varname,i,__FILE__,__LINE__);
511
char *source_addr = (char *)qi->data + f->field_offset; // this is the address
513
fprintf(stderr, "im here rank %d var name %s i %d %s:%d\n",dt_read_data->dt_comm_rank,var->varname,i,__FILE__,__LINE__);
516
case adios_unsigned_byte:
517
source_addr2 = *((char **)source_addr);
520
case adios_string: // TODO
521
return strlen((char *)source_addr) + 1;
522
source_addr2 = *((char **)source_addr);
526
case adios_unsigned_short:
527
source_addr2 = *((short int **)source_addr);
531
case adios_unsigned_integer:
532
source_addr2 = *((int **)source_addr);
536
case adios_unsigned_long:
537
source_addr2 = *((long int **)source_addr);
541
source_addr2 = *((float **)source_addr);
545
source_addr2 = *((double **)source_addr);
548
case adios_long_double:
549
source_addr2 = *((long double **)source_addr);
553
case adios_double_complex:
555
adios_error(err_invalid_read_method, "complex data type is not supported.");
558
//memcopy(var->read_buffer, source_addr2, var->read_buffer_size);
560
// now we need to calculate how to map chunks into read buffer
561
// according to dimension specification
562
// also we need to figure out which part should be shuffled to
563
// other peer processes and which part should be moved in from
565
// search along the slowest changing dimension and determine if
566
// the sub-chunk falls into the reading region. If so, copy the
567
// whole sub-chunk, otherwise search along the second slowest
568
// changing dimension within that subchunk, identify which part
569
// should go where and copy the remaining part to local buffer
570
fprintf(stderr, "im here rank %d var name %s i %d %s:%d\n",dt_read_data->dt_comm_rank,var->varname,i,__FILE__,__LINE__);
572
uint64_t global_start_r = 0;
573
for(j = var->ndims-1; j >= 0; j --) {
574
if(var->global_bounds[j]) {
575
global_start_r *= var->global_bounds[j];
576
global_start_r += var->global_offsets[j];
580
if(!strcmp(v->varname,"bconds")){
581
int *bconds = (int *)source_addr2;
584
fprintf(stderr, "bconds[%d]=%d\n",t,bconds[t]);
589
if(!strcmp(v->varname,"v1")){
590
double *v1 = (double *)source_addr2;
593
fprintf(stderr, "v1[%d]=%f\n",t,v1[t]);
598
fprintf(stderr, "im here rank %d var name %s i %d %s:%d\n",dt_read_data->dt_comm_rank,var->varname,i,__FILE__,__LINE__);
599
for(j = v->ndims-1; j >= 0; j --) {
600
// for each sub-chunk, test if it falls into the reading region
601
int lower = (v->global_offsets[j] >= var->global_offsets[j]) ? 1: 0;
602
int higher = ((v->global_offsets[j]+v->local_bounds[j]) <=
603
(var->global_offsets[j]+var->local_bounds[j])) ? 1: 0;
604
fprintf(stderr, "im here rank=%d j %d name %s go %lu lb %lu go %lu lb %lu %s:%d\n", dt_read_data->dt_comm_rank,j,var->varname,v->global_offsets[j],v->local_bounds[j],var->global_offsets[j],var->local_bounds[j],__FILE__,__LINE__);
605
if(lower && higher) {
606
fprintf(stderr, "im here j %d %s:%d\n", j,__FILE__,__LINE__);
607
if(j == 0) { // copy the whole chunks
608
uint64_t my_start = 0;
609
uint64_t my_size = type_size;
611
for(k = v->ndims-1; k >= 0; k --) {
612
my_start *= v->global_bounds[k];
613
my_start += v->global_offsets[k];
614
my_size *= v->local_bounds[k];
616
char *position = (char *)var->data + (my_start - global_start_r) * type_size;
617
fprintf(stderr, "rank=%d name = %s position=%lu data=%lu mystart = %lu global_start_r = %lu mysize = %lu %s:%d\n", dt_read_data->dt_comm_rank,var->varname,position,var->data,my_start, global_start_r, my_size, __FILE__,__LINE__);
618
memcpy(position, source_addr2, my_size);
619
fprintf(stderr, "im here j %d %s:%d\n", j,__FILE__,__LINE__);
628
fprintf(stderr, "im here rank=%d j %d name %s go %lu lb %lu go %lu lb %lu %s:%d\n", dt_read_data->dt_comm_rank,j,var->varname,v->global_offsets[j],v->local_bounds[j],var->global_offsets[j],var->local_bounds[j],__FILE__,__LINE__);
629
fprintf(stderr, "im here j %d %s:%d\n", j,__FILE__,__LINE__);
631
adios_error(err_invalid_read_method, "Datatap cannot support this decomposition.\n");
634
fprintf(stderr, "im here j %d %s:%d\n", j,__FILE__,__LINE__);
637
// end of data re-organization for this var
641
// now process the next var to read
644
fprintf(stderr, "im here rank %d %s:%d\n",dt_read_data->dt_comm_rank,__FILE__,__LINE__);
650
void data_handler (void *data, int length, void *user_data, attr_list attr, int rank, void *timing, file_info *f)
652
recvtime *r = (recvtime*)timing;
653
IOhandle *h = (IOhandle*)user_data;
654
elapsedtime *e = updateTimes(h, r, length);
656
// decode the data and insert the data into the queue
657
int decoded_length = FFS_est_decode_length(h->iocontext, data, length);
659
// TODO: make sure we free this later (after writing to hdf5 file)
660
char *decoded_data = (char *)malloc(decoded_length);
663
adios_error(err_no_memory, "Cannot allocate memory for Datatap.");
667
FFSTypeHandle ffshandle = FFSTypeHandle_from_encode(h->iocontext, data);
668
FMFormat form = FMFormat_of_original(ffshandle);
671
FMStructDescList var_list = get_localized_formats(form);
672
establish_conversion(h->iocontext, ffshandle, var_list);
673
FFSdecode_to_buffer(h->iocontext, data, decoded_data);
675
// The encoded data can be recycled now
676
returnbuffer(h, data, length);
678
QueueItem * qi =(QueueItem *) malloc(sizeof(QueueItem));
680
adios_error(err_no_memory, "Cannot allocate memory for Datatap.");
683
qi->data = decoded_data;
684
qi->length = decoded_length;
685
qi->var_list = var_list;
686
qi->rank = rank; // TODO: hack it!
688
// put message in queue
689
pthread_mutex_lock(&(f->mutex));
690
// while(queue_size(f->dt_queue) >= f->dt_queue_max_length) {
691
// // wait until queue is not full
692
// pthread_cond_signal(&(dt_read_data->cond2));
693
// pthread_cond_wait(&(dt_read_data->cond1), &(dt_read_data->mutex));
695
queue_enqueue(f->dt_queue, qi);
696
if(queue_size(f->dt_queue) == f->num_chunks) {
697
pthread_cond_signal(&(f->cond2));
699
pthread_mutex_unlock(&(f->mutex));
702
fprintf(stderr, "im here rank= %d client rank=%d data handler done %s:%d\n",h->rank,rank,__FILE__,__LINE__);
704
// TODO: check if it's time to stop
707
void * dt_server_thread_func (void *arg)
709
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
710
MPI_Comm orig_comm = (MPI_Comm) arg;
713
// duplicate a MPI communicator for synchronization between dt servers
714
int rc = MPI_Comm_dup(orig_comm, &(dt_read_data->dt_comm));
715
if(rc != MPI_SUCCESS) {
716
error(err_unspecified, "Cannot duplicate communicator for Datatap.");
720
dt_read_data->dt_comm = orig_comm;
723
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
726
dt_read_data->dt_comm_size = 1;
727
dt_read_data->dt_comm_rank = 0;
729
MPI_Comm_size(dt_read_data->dt_comm, &(dt_read_data->dt_comm_size));
730
MPI_Comm_rank(dt_read_data->dt_comm, &(dt_read_data->dt_comm_rank));
733
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
734
// initialize ptlpbio interface
735
dt_read_data->dt_cm = CManager_create();
736
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
737
CMlisten_specific(dt_read_data->dt_cm, NULL);
739
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
742
dt_read_data->dt_handle = EVthin_portals_listen(dt_read_data->dt_cm, 120,
743
0, data_handler, dt_read_data->dt_comm);
747
appid = globals_adios_get_application_id(&was_set);
749
adios_error(err_unspecified, "Application ID was not set.");
750
sprintf(param_file, "datatap_param\0");
753
sprintf(param_file, "datatap_param%d\0", appid);
756
// dt server(rank 0) gather contact info from other servers and write to
757
// a file so upstream writers can connect to this application
758
outputConnectionData(param_file, dt_read_data->dt_handle);
760
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
761
dt_read_data->dt_server_ready = 1;
764
CMrun_network(dt_read_data->dt_cm);
766
// TODO: cleanup and exit
767
CManager_close(dt_read_data->dt_cm);
771
int adios_read_datatap_init (MPI_Comm comm)
773
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
774
setenv("CMSelfFormats", "1", 1);
776
// initialize Datatap read method structure
777
dt_read_data = (datatap_read_method_data *) malloc(sizeof(datatap_read_method_data));
779
adios_error(err_no_memory, "Cannot allocate memory for Datatap.");
782
memset(dt_read_data, 0, sizeof(datatap_read_method_data));
784
// enable threading for EVPath
787
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
789
// set up queue for incoming data chunks
790
dt_read_data->dt_queue =(Queue *) calloc(1, sizeof(Queue));
791
if(!dt_read_data->dt_queue) {
792
adios_error(err_no_memory, "Cannot allocate memory for Datatap.");
795
queue_init(dt_read_data->dt_queue, free);
796
dt_read_data->dt_queue_max_length = DT_MAX_QUEUE_LENGTH;
798
pthread_mutex_init(&(dt_read_data->mutex), NULL);
799
pthread_cond_init(&(dt_read_data->cond1), NULL);
800
pthread_cond_init(&(dt_read_data->cond2), NULL);
803
// fork the thread to poll network
804
int rc = pthread_create(&(dt_read_data->dt_server_thread), NULL,
805
dt_server_thread_func, (void *)comm);
807
adios_error(err_unspecified, "Failed to create Datatap server thread.");
811
// TODO: wait until the dt server thread is ready
812
while(!dt_read_data->dt_server_ready) { }
814
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
818
int adios_read_datatap_finalize ()
820
// notify and wait for dt server thread to exit
821
datatap_stop_server(dt_read_data->dt_handle);
822
pthread_join(dt_read_data->dt_server_thread, NULL);
825
// TODO: we need a datatap cleanup function
826
pthread_mutex_destroy(&(dt_read_data->mutex));
827
pthread_cond_destroy(&(dt_read_data->cond1));
828
pthread_cond_destroy(&(dt_read_data->cond2));
829
free(dt_read_data->dt_queue);
836
ADIOS_FILE *adios_read_datatap_fopen(const char *fname, MPI_Comm comm)
841
datatap_read_file_data *ds = (datatap_read_file_data *) malloc(sizeof(datatap_read_file_data));
843
adios_error(err_no_memory, "Cannot allocate memory for Datatap.");
846
ds->file_name = strdup(fname);
848
// here we need to syncrhonize with other reader processes to see what they find
849
// 1: file available locally
851
int my_status, global_status;
852
int total_num_readers, my_rank;
855
total_num_readers = 1;
858
MPI_Comm_size(comm, &total_num_readers);
859
MPI_Comm_rank(comm, &my_rank);
863
ds->my_rank = my_rank;
864
ds->comm_size = total_num_readers;
866
// first we need to make sure the file has been 'written'
867
// TODO: always start from timstep 0
869
ds->f_info = datatap_get_file_info(dt_read_data->dt_handle, fname, dt_read_data->num_io_dumps, &is_EOF);
871
// now I should tell my peer readers that I have seen this file available locally
873
if(total_num_readers > 1) {
874
int rc = MPI_Allreduce(&my_status, &global_status, 1, MPI_INT, MPI_SUM, comm);
875
if(rc != MPI_SUCCESS) {
876
fprintf(stderr, "something bad happened somewhere.\n");
882
// now I can move on to process meta-data
883
fprintf(stderr, "im here rank %d move on %s:%d\n", my_rank, __FILE__,__LINE__);
887
adios_errno = err_end_of_file;
888
adios_error(err_end_of_file, "Reach the end of file (%s).", fname);
892
if(total_num_readers > 1) {
893
// now I should wait for my peer readers to see EOF in their local context
894
int rc = MPI_Barrier(comm);
895
if(rc != MPI_SUCCESS) {
896
fprintf(stderr, "something bad happened somewhere.\n");
902
if(total_num_readers > 1) {
904
// I didn't see the file available in my local context, but my peer readers may
905
// have seen it, so I should ask them to figure out
906
fprintf(stderr, "im here rank %d ask %s:%d\n", my_rank, __FILE__,__LINE__);
908
MPI_Allreduce(&my_status, &global_status, 1, MPI_INT, MPI_SUM, comm);
909
if(global_status == 0) { // no one see file available
910
fprintf(stderr, "im here rank %d no file %s:%d\n", my_rank, __FILE__,__LINE__);
911
adios_errno = err_file_not_found_error;
912
adios_error(err_file_not_found_error, "Cannot find file (%s).", fname);
913
fprintf(stderr, "im here rank %d no file %s:%d\n", my_rank, __FILE__,__LINE__);
919
// some one has seen this file available, so I should keep polling instead of return
921
fprintf(stderr, "im here rank %d %s:%d\n", my_rank, __FILE__,__LINE__);
923
ds->f_info = datatap_get_file_info(dt_read_data->dt_handle,
924
fname, dt_read_data->num_io_dumps, &is_EOF);
927
fprintf(stderr, "im here rank %d %s:%d\n", my_rank, __FILE__,__LINE__);
932
adios_errno = err_end_of_file;
933
adios_error(err_end_of_file, "Reach the end of file (%s).", fname);
937
int rc = MPI_Barrier(comm);
938
if(rc != MPI_SUCCESS) {
939
fprintf(stderr, "something bad happened somewhere.\n");
944
usleep(10000); // TODO: we need to set a proper value
945
fprintf(stderr, "im here rank %d %s:%d\n", my_rank, __FILE__,__LINE__);
954
adios_errno = err_file_not_found_error;
955
adios_error(err_file_not_found_error, "Cannot find file (%s).", fname);
956
fprintf(stderr, "im here rank %d no file %s:%d\n", my_rank, __FILE__,__LINE__);
963
fprintf(stderr, "im here rank %d see file %s:%d\n", my_rank, __FILE__,__LINE__);
965
// we don't know what to read yet
968
// ds->num_vars_read = 0;
969
// ds->vars_read = NULL;
970
// ds->vars_read_tail = NULL;
972
// TODO: add a loop over chunks
974
for(i = 0; i < ds->f_info->num_chunks; i ++) {
975
chunk_info *current_chunk = &(ds->f_info->chunks[i]);
977
fprintf(stderr, "im here %s:%d\n", __FILE__,__LINE__);
978
// parse the var info
979
char *current_pos = current_chunk->var_info_buffer;
980
int total_var_info_size = *(int *)current_pos; // total size
984
ds->host_language_fortran = *(enum ADIOS_FLAG *)current_pos; // host language
988
int group_name_len = *(int *)current_pos; // size of group name
991
if(i == 0) { // TODO: let's assume one group name
992
ds->group_name = strdup(current_pos); // group name
994
current_pos += group_name_len;
996
int num_vars_in_pg = *(int *)current_pos; // total num of vars
998
fprintf(stderr, "im here info size %d num vars %d %s:%d\n", total_var_info_size,num_vars_in_pg,__FILE__,__LINE__);
1000
datatap_var_info *current_var;
1001
char *end = current_chunk->var_info_buffer + total_var_info_size;
1002
while(current_pos < end) {
1003
// size of this var info
1004
int var_info_size = *(int *) current_pos;
1005
current_pos += sizeof(int);
1006
int var_id = *(int *) current_pos;
1007
current_pos += sizeof(int);
1008
int varname_len = *(int *) current_pos;
1009
current_pos += sizeof(int);
1010
char *varname = current_pos;
1011
current_pos += varname_len;
1012
int varpath_len = *(int *) current_pos;
1013
current_pos += sizeof(int);
1014
char *varpath = current_pos;
1015
current_pos += varpath_len;
1017
// now we go through the current list of vars to see if this is new
1018
current_var = ds->vars;
1019
while(current_var != NULL) {
1020
// TODO: compare var id
1021
//if(!strcmp(current_var->varname, varname) &&
1022
// !strcmp(current_var->varpath, varpath)) {
1023
if(var_id == current_var->id) {
1027
current_var = current_var->next;
1031
if(!current_var) { // this is a new var
1032
current_var = (datatap_var_info *) malloc(sizeof(datatap_var_info));
1034
adios_error(err_no_memory, "Cannot allocate memory for Datatap.");
1037
current_var->id = var_id;
1038
current_var->varname = strdup(varname);
1039
current_var->varpath = strdup(varpath);
1040
current_var->type = *(enum ADIOS_DATATYPES *) current_pos;
1041
current_pos += sizeof(enum ADIOS_DATATYPES);
1042
current_var->time_dim = *(int *) current_pos;
1043
current_pos += sizeof(int);
1044
current_var->ndims = *(int *) current_pos;
1045
current_pos += sizeof(int);
1046
current_var->num_chunks = 0;
1047
current_var->chunks = NULL;
1048
if(!current_var->ndims) { // scalars and strings
1049
current_var->data_size = common_read_type_size(current_var->type, current_pos);
1051
fprintf(stderr, "im here %s %s %d %s:%d\n", current_var->varname, current_var->varpath, current_var->ndims, __FILE__,__LINE__);
1052
current_var->next = ds->vars;
1053
ds->vars = current_var;
1057
current_pos += sizeof(enum ADIOS_DATATYPES);
1058
//current_var->time_dim = *(int *) current_pos;
1059
current_pos += sizeof(int);
1060
//current_var->ndims = *(int *) current_pos;
1061
current_pos += sizeof(int);
1064
datatap_var_chunk *new_chunk = (datatap_var_chunk *) malloc(sizeof(datatap_var_chunk));
1066
adios_error(err_no_memory, "Cannot allocate memory for Datatap.");
1069
new_chunk->next = current_var->chunks;
1070
current_var->chunks = new_chunk;
1071
current_var->num_chunks ++;
1073
new_chunk->rank = current_chunk->rank;
1074
if(!current_var->ndims) { // scalars and strings
1076
new_chunk->data = malloc(current_var->data_size);
1077
if(!new_chunk->data) {
1078
adios_error(err_no_memory, "Cannot allocate memory for Datatap.");
1081
memcpy(new_chunk->data, current_pos, current_var->data_size);
1082
current_pos += current_var->data_size;
1085
new_chunk->local_bounds = (uint64_t *) malloc(current_var->ndims * sizeof(uint64_t));
1086
if(!new_chunk->local_bounds) {
1087
adios_error(err_no_memory, "Cannot allocate memory for Datatap.");
1090
new_chunk->global_bounds = (uint64_t *) malloc(current_var->ndims * sizeof(uint64_t));
1091
if(!new_chunk->global_bounds) {
1092
adios_error(err_no_memory, "Cannot allocate memory for Datatap.");
1095
new_chunk->global_offsets = (uint64_t *) malloc(current_var->ndims * sizeof(uint64_t));
1096
if(!new_chunk->global_offsets) {
1097
adios_error(err_no_memory, "Cannot allocate memory for Datatap.");
1101
for(i = 0; i < current_var->ndims; i ++) {
1102
new_chunk->local_bounds[i] = *(uint64_t *)current_pos;
1103
current_pos += sizeof(uint64_t);
1104
new_chunk->global_bounds[i] = *(uint64_t *)current_pos;
1105
current_pos += sizeof(uint64_t);
1106
new_chunk->global_offsets[i] = *(uint64_t *)current_pos;
1107
current_pos += sizeof(uint64_t);
1111
// TODO: at this point, we no longer need var_info_buffer so free it
1112
// free(current_chunk->var_info_buffer);
1116
// TODO: replicate meta-data among peer reader
1117
ds->num_vars_peer = 0;
1118
ds->vars_peer = NULL;
1120
if(total_num_readers > 1)
1122
char *send_buf = NULL;
1123
char *recv_buf = NULL;
1124
int send_count = VAR_INFO_SIZE, recv_count = VAR_INFO_SIZE;
1126
for(i = 0; i < ds->f_info->num_chunks; i ++) {
1127
chunk_info *current_chunk = &(ds->f_info->chunks[i]);
1128
fprintf(stderr, "im here rank %d %d %s:%d\n", ds->my_rank, current_chunk->rank,__FILE__,__LINE__);
1129
if(current_chunk->rank == 0) {
1130
send_buf = ds->f_info->chunks[i].var_info_buffer;
1135
// TODO: before we do this, we need to make sure this one has sth special
1136
// the only case we need to deal with is local array with only one chunk
1137
// we will check this when calling read_var
1139
fprintf(stderr, "im here rank %d %s:%d\n", ds->my_rank, __FILE__,__LINE__);
1140
int rc = MPI_Bcast(send_buf, send_count, MPI_BYTE, 0, comm);
1141
if(rc != MPI_SUCCESS) {
1142
fprintf(stderr, "rank %d: MPI_Scatter returns error (%d). %s:%d\n",
1143
my_rank, rc, __FILE__, __LINE__);
1148
recv_buf = (char *) malloc(VAR_INFO_SIZE);
1150
adios_error(err_no_memory, "Cannot allocate memory for Datatap.");
1154
fprintf(stderr, "im here rank %d %s:%d\n", ds->my_rank, __FILE__,__LINE__);
1155
int rc = MPI_Bcast(recv_buf, recv_count, MPI_BYTE, 0, comm);
1156
if(rc != MPI_SUCCESS) {
1157
fprintf(stderr, "rank %d: MPI_Scatter returns error (%d). %s:%d\n",
1158
my_rank, rc, __FILE__, __LINE__);
1164
// parse the var_info buffer
1165
char *current_pos = recv_buf;
1166
int total_var_info_size = *(int *)current_pos; // total size
1169
current_pos += 4; // host language
1171
int group_name_len = *(int *)current_pos; // size of group name
1173
current_pos += group_name_len;
1175
int num_vars = *(int *)current_pos; // total num of vars
1178
datatap_var_info *current_var;
1179
char *end = recv_buf + total_var_info_size;
1180
while(current_pos < end) {
1181
// size of this var info
1182
int var_info_size = *(int *) current_pos;
1183
char *start_of_next_var = current_pos + var_info_size;
1184
current_pos += sizeof(int);
1185
int var_id = *(int *) current_pos;
1186
current_pos += sizeof(int);
1187
int varname_len = *(int *) current_pos;
1188
current_pos += sizeof(int);
1189
char *varname = current_pos;
1190
current_pos += varname_len;
1191
int varpath_len = *(int *) current_pos;
1192
current_pos += sizeof(int);
1193
char *varpath = current_pos;
1194
current_pos += varpath_len;
1196
// now we go through the current list of vars to see if this is new
1197
current_var = ds->vars;
1198
while(current_var != NULL) {
1199
//if(!strcmp(current_var->varname, varname) &&
1200
// !strcmp(current_var->varpath, varpath)) {
1201
if(var_id == current_var->id) {
1202
// this var is locally available, so skip it
1206
current_var = current_var->next;
1210
if(current_var) { // this var is locally available
1211
current_pos = start_of_next_var;
1215
current_var = (datatap_var_info *) malloc(sizeof(datatap_var_info));
1217
adios_error(err_no_memory, "Cannot allocate memory for Datatap.");
1220
current_var->id = var_id;
1221
current_var->varname = strdup(varname);
1222
current_var->varpath = strdup(varpath);
1223
current_var->type = *(enum ADIOS_DATATYPES *) current_pos;
1224
current_pos += sizeof(enum ADIOS_DATATYPES);
1225
current_var->time_dim = *(int *) current_pos;
1226
current_pos += sizeof(int);
1227
current_var->ndims = *(int *) current_pos;
1228
current_pos += sizeof(int);
1229
current_var->num_chunks = 0;
1230
current_var->chunks = NULL;
1231
if(!current_var->ndims) { // scalars and strings
1232
current_var->data_size = common_read_type_size(current_var->type, current_pos);
1234
current_var->next = ds->vars_peer;
1235
ds->vars_peer = current_var;
1236
ds->num_vars_peer ++;
1238
datatap_var_chunk *new_chunk = (datatap_var_chunk *) malloc(sizeof(datatap_var_chunk));
1240
adios_error(err_no_memory, "Cannot allocate memory for Datatap.");
1243
new_chunk->next = current_var->chunks;
1244
current_var->chunks = new_chunk;
1245
current_var->num_chunks ++;
1247
new_chunk->rank = 0;
1248
if(!current_var->ndims) { // scalars and strings
1250
new_chunk->data = malloc(current_var->data_size);
1251
if(!new_chunk->data) {
1252
adios_error(err_no_memory, "Cannot allocate memory for Datatap.");
1255
memcpy(new_chunk->data, current_pos, current_var->data_size);
1256
current_pos += current_var->data_size;
1259
new_chunk->local_bounds = (uint64_t *) malloc(current_var->ndims * sizeof(uint64_t));
1260
if(!new_chunk->local_bounds) {
1261
adios_error(err_no_memory, "Cannot allocate memory for Datatap.");
1264
new_chunk->global_bounds = (uint64_t *) malloc(current_var->ndims * sizeof(uint64_t));
1265
if(!new_chunk->global_bounds) {
1266
adios_error(err_no_memory, "Cannot allocate memory for Datatap.");
1269
new_chunk->global_offsets = (uint64_t *) malloc(current_var->ndims * sizeof(uint64_t));
1270
if(!new_chunk->global_offsets) {
1271
adios_error(err_no_memory, "Cannot allocate memory for Datatap.");
1275
for(i = 0; i < current_var->ndims; i ++) {
1276
new_chunk->local_bounds[i] = *(uint64_t *)current_pos;
1277
current_pos += sizeof(uint64_t);
1278
new_chunk->global_bounds[i] = *(uint64_t *)current_pos;
1279
current_pos += sizeof(uint64_t);
1280
new_chunk->global_offsets[i] = *(uint64_t *)current_pos;
1281
current_pos += sizeof(uint64_t);
1290
fprintf(stderr, "im here %d %d %d %s:%d\n", ds->host_language_fortran, adios_flag_yes, futils_is_called_from_fortran(), __FILE__, __LINE__);
1291
// TODO: here we need to ajust array dimension if the reader is in C
1292
// but writer is in Fortran or vice versa
1293
if(ds->host_language_fortran == adios_flag_yes && !futils_is_called_from_fortran()) {
1294
// reader is in C but writer is in Fortran, there are several things to adjust:
1295
// array dimension index starts from 1 in Fortran --> start from 0 in C
1296
// change array dimension order (including time dimension)
1297
datatap_var_info *v = ds->vars;
1299
datatap_var_chunk *chunk = v->chunks;
1303
for(i = 0; i < v->ndims/2; i ++) {
1304
temp = chunk->local_bounds[v->ndims-i-1];
1305
chunk->local_bounds[v->ndims-i-1] = chunk->local_bounds[i];
1306
chunk->local_bounds[i] = temp;
1307
temp = chunk->global_bounds[v->ndims-i-1];
1308
chunk->global_bounds[v->ndims-i-1] = chunk->global_bounds[i];
1309
chunk->global_bounds[i] = temp;
1310
temp = chunk->global_offsets[v->ndims-i-1];
1311
chunk->global_offsets[v->ndims-i-1] = chunk->global_offsets[i];
1312
chunk->global_offsets[i] = temp;
1314
chunk = chunk->next;
1316
if(v->time_dim > 0) { // -1 means no time dimension
1317
v->time_dim = v->ndims - v->time_dim;
1324
datatap_var_chunk *chunk = v->chunks;
1327
for(i = 0; i < v->ndims; i ++) {
1328
chunk->local_bounds[i] --;
1329
//chunk->global_offsets[i] --;
1332
for(i = 0; i < v->ndims/2; i ++) {
1333
temp = chunk->local_bounds[v->ndims-i-1];
1334
chunk->local_bounds[v->ndims-i-1] = chunk->local_bounds[i];
1335
chunk->local_bounds[i] = temp;
1336
temp = chunk->global_bounds[v->ndims-i-1];
1337
chunk->global_bounds[v->ndims-i-1] = chunk->global_bounds[i];
1338
chunk->global_bounds[i] = temp;
1339
temp = chunk->global_offsets[v->ndims-i-1];
1340
chunk->global_offsets[v->ndims-i-1] = chunk->global_offsets[i];
1341
chunk->global_offsets[i] = temp;
1343
chunk = chunk->next;
1345
v->time_dim = v->ndims - v->time_dim - 1;
1349
else if(ds->host_language_fortran == adios_flag_no && futils_is_called_from_fortran()) {
1350
// adjuct dimension C --> Fortran
1351
// TODO: for the demo, this will not happen so leave it here
1354
fp = (ADIOS_FILE *) malloc(sizeof(ADIOS_FILE));
1356
adios_error(err_no_memory, "Cannot allocate memory for file info.");
1361
fp->fh = (uint64_t) ds;
1362
fp->groups_count = 1; // TODO: assume one group per file
1363
fp->vars_count = 0; // TODO: just do not use this filed
1364
fp->attrs_count = 0; // TODO: do not support attributes yet
1366
// TODO: since we require fopen/fclose for each timestep,
1367
// so there is always only 1 timestep in file
1368
// TODO: set ntimesteps to be max to make pixie3d working
1369
fp->tidx_start = ds->f_info->timestep;
1370
fp->ntimesteps = INT32_MAX;
1371
fprintf(stderr, "im here timestep %d %s:%d\n", fp->tidx_start = ds->f_info->timestep, __FILE__,__LINE__);
1376
alloc_namelist(&fp->group_namelist, fp->groups_count);
1377
for (i = 0; i < fp->groups_count; i++) {
1378
if (!fp->group_namelist[i]) {
1379
adios_error(err_no_memory, "Cannot allocate buffer in adios_fopen()");
1383
strcpy(fp->group_namelist[i], ds->group_name);
1386
fprintf(stderr, "im here %s:%d\n", __FILE__,__LINE__);
1388
// TODO: return code of adios_errno for fopen:
1389
// 0: file metadata is available and everything is success
1390
// we need a value telling that the file is not available yet
1391
// we also need a value telling that writer is finishing so don't
1392
// read this file any more
1397
int adios_read_datatap_fclose(ADIOS_FILE *fp)
1399
fprintf(stderr, "im here rank %d %s:%d\n",dt_read_data->dt_comm_rank,__FILE__,__LINE__);
1400
datatap_read_file_data *ds = (datatap_read_file_data *) fp->fh;
1404
dt_read_data->num_io_dumps ++;
1406
// recycle the queue
1409
while((!queue_dequeue(ds->f_info->dt_queue, &qi)) && qi) {
1414
// now we 'read' the data into user-provided buffers
1415
// note that the data is actually moved by dt servr thread
1416
// here we just re-organize the data into the distribution
1420
// process the incoming data chunks
1423
uint64_t num_chunks_processed = 0;
1427
pthread_mutex_lock(&(ds->f_info->mutex));
1428
while(queue_size(ds->f_info->dt_queue) == 0) {
1429
fprintf(stderr, "im here num_chunks_processed %d %d %s:%d\n",num_chunks_processed, ds->num_pgs,__FILE__,__LINE__);
1430
// check if it's time to finish this file
1431
if(num_chunks_processed == ds->num_pgs) { // TODO
1432
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
1433
dt_read_data->num_io_dumps ++;
1434
pthread_mutex_unlock(&(ds->f_info->mutex));
1439
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
1440
pthread_cond_signal(&(ds->f_info->cond1));
1441
pthread_cond_wait(&(ds->f_info->cond2), &(ds->f_info->mutex));
1444
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
1447
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
1451
// start of busy time
1452
while((!queue_dequeue(ds->f_info->dt_queue, &d)) && d) {
1453
//pthread_cond_signal(&(ds->f_info->cond1));
1454
pthread_mutex_unlock(&(ds->f_info->mutex));
1456
qi =(QueueItem *) d;
1458
fprintf(stderr, "im here rank %d %s:%d\n",dt_read_data->dt_comm_rank,__FILE__,__LINE__);
1459
int rc = reorganize_array (qi, ds);
1460
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
1462
adios_error(err_unspecified, "Error in reorganize_array() function.");
1466
// recycle decoded data buffer
1470
num_chunks_processed ++;
1472
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
1473
pthread_mutex_unlock(&(ds->f_info->mutex));
1474
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
1475
pthread_mutex_lock(&(ds->f_info->mutex));
1476
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
1478
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
1480
pthread_mutex_unlock(&(ds->f_info->mutex));
1484
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
1485
// notify dt server thread that we are done with this file
1486
int rc = datatap_release_file(dt_read_data->dt_handle, ds->f_info);
1488
adios_error(err_unspecified, "Could not close file.");
1492
free_namelist((fp->group_namelist), fp->groups_count);
1493
if (ds->file_name) {
1494
free(ds->file_name);
1495
ds->file_name = NULL;
1498
if (ds->group_name) {
1499
free(ds->group_name);
1500
ds->group_name = NULL;
1503
// TODO release pg_info and var_info
1504
datatap_var_info *v = ds->vars;
1505
datatap_var_info *tmp_v;
1510
datatap_var_chunk *chunk = v->chunks;
1511
datatap_var_chunk *tmp_chunk;
1515
free(chunk->local_bounds);
1516
free(chunk->global_bounds);
1517
free(chunk->global_offsets);
1522
chunk = chunk->next;
1529
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
1535
datatap_var_chunk *chunk = v->chunks;
1536
datatap_var_chunk *tmp_chunk;
1540
free(chunk->local_bounds);
1541
free(chunk->global_bounds);
1542
free(chunk->global_offsets);
1547
chunk = chunk->next;
1553
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
1557
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
1561
ADIOS_GROUP * adios_read_datatap_gopen (ADIOS_FILE *fp, const char *grpname)
1563
fprintf(stderr, "im here %s:%d\n", __FILE__,__LINE__);
1565
adios_error(err_invalid_group, "Group name is not valid");
1569
for(i = 0; i < fp->groups_count; i ++) {
1570
if(!strcmp(grpname, fp->group_namelist[i])) {
1571
return adios_read_datatap_gopen_byid(fp, i);
1574
adios_error(err_invalid_group, "Group %s is not valid", grpname);
1578
ADIOS_GROUP * adios_read_datatap_gopen_byid (ADIOS_FILE *fp, int grpid)
1580
fprintf(stderr, "im here %s:%d\n", __FILE__,__LINE__);
1581
datatap_read_file_data *ds = (datatap_read_file_data *) fp->fh;
1586
gp = (ADIOS_GROUP *) malloc(sizeof(ADIOS_GROUP));
1588
adios_error(err_no_memory, "Could not allocate memory for group info");
1592
// TODO: again, assume one group per file
1594
gp->gh = (uint64_t) 0; // TODO: we should re-organize the metadata
1596
gp->attrs_count = 0; // attributes are not supported yet
1598
fprintf(stderr, "im here rank %d %s:%d\n", ds->my_rank, __FILE__,__LINE__);
1599
// generate a list of variables with distinct names among all pgs
1600
gp->vars_count = ds->num_vars + ds->num_vars_peer;
1601
fprintf(stderr, "im here count %d %s:%d\n", gp->vars_count,__FILE__,__LINE__);
1603
// to return a globally consistently ordered var list, we sort the list by var id
1604
datatap_var_info **vars = (datatap_var_info *) malloc(sizeof(datatap_var_info *) * gp->vars_count);
1606
adios_error(err_no_memory, "Cannot allocate buffer in adios_read_datatap_gopen_byid()");
1609
memset(vars, 0, sizeof(datatap_var_info *) * gp->vars_count);
1611
fprintf(stderr, "im here rank %d %s:%d\n", ds->my_rank, __FILE__,__LINE__);
1612
datatap_var_info *current_var = ds->vars_peer;
1613
datatap_var_info *var_to_sort = current_var;
1614
while(var_to_sort) {
1616
for(j = 0; j < gp->vars_count; j ++) {
1617
if(vars[j] == NULL) {
1618
vars[j] = var_to_sort;
1619
current_var = current_var->next;
1620
var_to_sort = current_var;
1623
else if(vars[j]->id < var_to_sort->id) {
1625
datatap_var_info *tmp;
1627
vars[j] = var_to_sort;
1633
fprintf(stderr, "im here rank %d %s:%d\n", ds->my_rank, __FILE__,__LINE__);
1635
current_var = ds->vars;
1636
var_to_sort = current_var;
1637
while(var_to_sort) {
1639
for(j = 0; j < gp->vars_count; j ++) {
1640
if(vars[j] == NULL) {
1641
vars[j] = var_to_sort;
1642
current_var = current_var->next;
1643
var_to_sort = current_var;
1646
else if(vars[j]->id < var_to_sort->id) {
1648
datatap_var_info *tmp;
1650
vars[j] = var_to_sort;
1656
// now the vars list is in descendent order
1657
gp->var_namelist = (char **) malloc(gp->vars_count * sizeof(char *));
1658
if(!gp->var_namelist) {
1659
adios_error(err_no_memory, "Cannot allocate buffer in adios_read_datatap_gopen_byid()");
1663
fprintf(stderr, "im here rank %d %s:%d\n", ds->my_rank, __FILE__,__LINE__);
1665
for(i = 0; i < gp->vars_count; i ++) {
1666
int index = gp->vars_count - i - 1;
1667
gp->var_namelist[i] = (char *) malloc(strlen(vars[index]->varname) +
1668
strlen(vars[index]->varpath) + 2);
1669
if(!gp->var_namelist[i]) {
1670
adios_error(err_no_memory, "Cannot allocate buffer in adios_read_datatap_gopen_byid()");
1673
// return full path name
1674
// TODO: make sure the size of var_namelist[j] is enough
1675
if(!strcmp(vars[index]->varpath, "/")) {
1676
sprintf(gp->var_namelist[i], "/%s\0\0", vars[index]->varname);
1679
sprintf(gp->var_namelist[i], "%s/%s\0", vars[index]->varpath,
1680
vars[index]->varname);
1683
fprintf(stderr, "im here rank %d %s:%d\n", ds->my_rank, __FILE__,__LINE__);
1685
// here we construct a bitmap of var list and send it to rank 0 so rank 0 knows
1686
// which vars are only avaialble in its local memory
1687
memset(ds->var_bitmap, 0, VAR_BITMAP_SIZE);
1688
current_var = ds->vars_peer;
1689
while(current_var) {
1690
int byte_pos = current_var->id / 8;
1691
int bit_pos = current_var->id % 8;
1692
unsigned char mask = 0x01 << bit_pos;
1693
ds->var_bitmap[byte_pos] = ds->var_bitmap[byte_pos] | mask;
1694
current_var = current_var->next;
1697
fprintf(stderr, "im here rank %d %s:%d\n", ds->my_rank, __FILE__,__LINE__);
1698
// send back to rank 0 so it knows which var is missing on other processes
1699
unsigned char *combined_bitmap = NULL;
1700
int combined_size = ds->comm_size * VAR_BITMAP_SIZE;
1701
if(ds->my_rank == 0) {
1702
combined_bitmap = (unsigned char *) malloc(combined_size);
1703
if(!combined_size) {
1704
adios_error(err_no_memory, "Cannot allocate buffer in adios_read_datatap_gopen_byid()");
1708
MPI_Gather(ds->var_bitmap, VAR_BITMAP_SIZE, MPI_BYTE, combined_bitmap, VAR_BITMAP_SIZE, MPI_BYTE, 0, ds->comm);
1710
fprintf(stderr, "im here rank %d %s:%d\n", ds->my_rank, __FILE__,__LINE__);
1711
if(ds->my_rank == 0) {
1712
// now determine which var is missing on other processes
1713
for(i = 0; i < ds->comm_size; i ++) {
1715
for(j = 0; j < VAR_BITMAP_SIZE; j ++) {
1716
ds->var_bitmap[j] = ds->var_bitmap[j] | combined_bitmap[i*VAR_BITMAP_SIZE+j];
1719
free(combined_bitmap);
1720
// if the corresponding bit is set in ds->var_bitmap, then the var is missing on other processes
1723
fprintf(stderr, "im here rank %d %s:%d\n", ds->my_rank, __FILE__,__LINE__);
1726
if(i == 0) { // the first pg
1727
gp->vars_count = ds->pgs[0].num_vars;
1728
gp->var_namelist = (char **) malloc(gp->vars_count * sizeof(char *));
1729
if(!gp->var_namelist) {
1730
adios_error(err_no_memory, "Cannot allocate buffer in adios_read_datatap_gopen_byid()");
1733
for(j = 0; j < gp->vars_count; j ++) {
1734
gp->var_namelist[j] = (char *) malloc(strlen(ds->pgs[i].vars[j].varname) +
1735
strlen(ds->pgs[i].vars[j].varpath) + 2);
1736
if(!gp->var_namelist[j]) {
1737
adios_error(err_no_memory, "Cannot allocate buffer in adios_read_datatap_gopen_byid()");
1740
// return full path name
1741
// TODO: make sure the size of var_namelist[j] is enough
1742
if(!strcmp(ds->pgs[i].vars[j].varpath, "/")) {
1743
sprintf(gp->var_namelist[j], "/%s\0\0", ds->pgs[i].vars[j].varname);
1746
sprintf(gp->var_namelist[j], "%s/%s\0", ds->pgs[i].vars[j].varpath,
1747
ds->pgs[i].vars[j].varname);
1755
// go over the vars in the ith pg
1756
for(j = 0; j < ds->pgs[i].num_vars; j ++) {
1757
// first, we need to make sure the var is not seen before
1761
if(!strcmp(ds->pgs[i].vars[j].varpath, "/")) {
1762
sprintf(fullname, "/%s\0", ds->pgs[i].vars[j].varname);
1765
sprintf(fullname, "%s/%s\0", ds->pgs[i].vars[j].varpath, ds->pgs[i].vars[j].varname);
1767
for(t = 0; t < k; t++) {
1768
if(!strcmp(fullname, gp->var_namelist[t])) {
1774
if(!is_new) continue;
1776
// now add this var to list
1777
char **temp = gp->var_namelist;
1778
// temp = (char **) realloc(gp->var_namelist, (k+1) * sizeof(char*));
1779
temp = (char **) malloc((k+1) * sizeof(char*));
1781
adios_error(err_no_memory, "Cannot allocate buffer in adios_read_datatap_gopen_byid()");
1785
memcpy(temp, gp->var_namelist, k*sizeof(char *));
1786
free(gp->var_namelist);
1787
gp->var_namelist = temp;
1791
// return full path name
1792
gp->var_namelist[k] = strdup(fullname);
1799
while(i < ds->num_pgs);
1802
fprintf(stderr, "im here rank %d %s:%d\n", ds->my_rank, __FILE__,__LINE__);
1804
gp->attr_namelist = 0;
1806
gp->timestep = ds->f_info->timestep;
1807
gp->lasttimestep = ds->f_info->timestep;
1809
// now we need to wait here until we see all data are ready
1810
pthread_mutex_lock(&(ds->f_info->mutex));
1811
while(queue_size(ds->f_info->dt_queue) != ds->f_info->num_chunks) {
1812
pthread_cond_wait(&(ds->f_info->cond2), &(ds->f_info->mutex));
1814
pthread_mutex_unlock(&(ds->f_info->mutex));
1816
fprintf(stderr, "im here rank %d %s:%d\n", ds->my_rank, __FILE__,__LINE__);
1820
int adios_read_datatap_gclose (ADIOS_GROUP *gp)
1822
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
1823
// datatap_read_file_data *ds = (datatap_read_file_data *) gp->fp->fh;
1825
free_namelist ((gp->attr_namelist),gp->attrs_count);
1827
for(i = 0; i < gp->vars_count; i ++) {
1828
free(gp->var_namelist[i]);
1830
free(gp->var_namelist);
1831
// free_namelist ((gp->var_namelist),gp->vars_count);
1833
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
1838
int adios_read_datatap_get_attr (ADIOS_GROUP *gp, const char *attrname,
1839
enum ADIOS_DATATYPES *type,
1840
int *size, void **data)
1842
// TODO: borrowed from dimes
1843
adios_error(err_invalid_read_method, "adios_read_datatap_get_attr is not implemented.");
1845
*type = adios_unknown;
1850
int adios_read_datatap_get_attr_byid (ADIOS_GROUP *gp, int attrid,
1851
enum ADIOS_DATATYPES *type,
1852
int *size, void **data)
1854
// TODO: borrowed from dimes
1855
adios_error(err_invalid_read_method, "adios_read_datatap_get_attr_byid is not implemented.");
1857
*type = adios_unknown;
1862
ADIOS_VARINFO * adios_read_datatap_inq_var (ADIOS_GROUP *gp, const char *varname)
1864
// TODO: usually user will read those variables reperesenting dimensions directly
1865
// error(err_invalid_read_method, "adios_read_datatap_inq_var is not implemented.");
1867
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
1868
// find the var among all pgs
1869
ADIOS_VARINFO *v = (ADIOS_VARINFO *) malloc(sizeof(ADIOS_VARINFO));
1871
adios_error(err_no_memory, "Cannot allocate buffer in adios_read_datatap_inq_var()");
1874
memset(v, 0, sizeof(ADIOS_VARINFO));
1876
datatap_read_file_data *ds = (datatap_read_file_data *) gp->fp->fh;
1879
datatap_var_info *current_var = ds->vars;
1880
while(current_var) {
1881
if(!compare_var_name(varname, current_var)) {
1886
current_var = current_var->next;
1891
current_var = ds->vars_peer;
1892
while(current_var) {
1893
if(!compare_var_name(varname, current_var)) {
1898
current_var = current_var->next;
1904
v->grpid = gp->grpid;
1906
for(i = 0; i < gp->vars_count; i ++) {
1907
if(!strcmp(gp->var_namelist[i], varname)) {
1908
v->varid = i; // TODO: this may not be cmpatible with BP
1912
v->type = current_var->type;
1913
v->ndim = current_var->ndims;
1914
v->timedim = current_var->time_dim;
1915
if(!v->ndim) { // scalar and string
1916
if(v->timedim != -1) { // scalar with time dimension
1918
v->dims = (uint64_t *) malloc(sizeof(uint64_t));
1920
adios_error(err_no_memory, "Cannot allocate buffer in adios_read_datatap_inq_var()");
1923
v->dims[0] = 1; // TODO: only one timestep in the file
1925
//int value_size = common_read_type_size(v->type, current_var->chunks->data);
1926
int value_size = current_var->data_size;
1927
v->value = malloc(value_size);
1929
adios_error(err_no_memory, "Cannot allocate buffer in adios_read_datatap_inq_var()");
1932
memcpy(v->value, current_var->chunks->data, value_size);
1935
v->dims = (uint64_t *) malloc(v->ndim * sizeof(uint64_t));
1937
adios_error(err_no_memory, "Cannot allocate buffer in adios_read_datatap_inq_var()");
1941
for(k = 0; k < v->ndim; k ++) {
1942
//v->dims[k] = ds->pgs[i].vars[j].global_bounds[k];
1943
v->dims[k] = current_var->chunks->global_bounds[k];
1949
adios_error(err_invalid_varname, "Cannot find var %s\n", varname);
1956
for(i = 0; i < ds->num_pgs; i ++) {
1957
for(j = 0; j < ds->pgs[i].num_vars; j ++) {
1958
// the parameter varname can be full path or just var name, so we
1959
// need to first find it by matching the name
1960
if(!compare_var_name(varname, &(ds->pgs[i].vars[j]))) {
1961
v->grpid = gp->grpid;
1962
v->varid = j; // TODO: this may not be cmpatible with BP
1963
v->type = ds->pgs[i].vars[j].type;
1964
v->ndim = ds->pgs[i].vars[j].ndims;
1965
v->timedim = ds->pgs[i].vars[j].time_dim;
1966
if(!v->ndim) { // scalar
1967
if(v->timedim != -1) { // scalar with time dimension
1969
v->dims = (uint64_t *) malloc(sizeof(uint64_t));
1971
adios_error(err_no_memory, "Cannot allocate buffer in adios_read_datatap_inq_var()");
1974
v->dims[0] = 1; // TODO: only one timestep in the file
1976
int value_size = common_read_type_size(v->type, ds->pgs[i].vars[j].data);
1977
v->value = malloc(value_size);
1979
adios_error(err_no_memory, "Cannot allocate buffer in adios_read_datatap_inq_var()");
1982
memcpy(v->value, ds->pgs[i].vars[j].data, value_size);
1985
v->dims = (uint64_t *) malloc(v->ndim * sizeof(uint64_t));
1987
adios_error(err_no_memory, "Cannot allocate buffer in adios_read_datatap_inq_var()");
1991
for(k = 0; k < v->ndim; k ++) {
1992
v->dims[k] = ds->pgs[i].vars[j].global_bounds[k];
2003
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
2006
ADIOS_VARINFO * adios_read_datatap_inq_var_byid (ADIOS_GROUP *gp, int varid)
2008
if(varid >= 0 && varid < gp->vars_count) {
2009
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
2010
return adios_read_datatap_inq_var(gp, gp->var_namelist[varid]);
2013
adios_error(err_invalid_varid, "Cannot find var %d\n", varid);
2018
void adios_read_datatap_free_varinfo (ADIOS_VARINFO *vp)
2022
if(!vp->ndim) { // scalar
2023
if(vp->timedim != -1) { // scalar with time dimension
2033
int64_t adios_read_datatap_read_var (ADIOS_GROUP *gp, const char *varname,
2034
const uint64_t *start, const uint64_t *count,
2037
fprintf(stderr, "im here read var %s addr %p %s:%d\n", varname, data,__FILE__,__LINE__);
2039
datatap_read_file_data *ds = (datatap_read_file_data *) gp->fp->fh;
2042
fprintf(stderr, "im here rank %d %s %s:%d\n", ds->my_rank,varname, __FILE__,__LINE__);
2043
datatap_var_info *current_var = ds->vars;
2044
while(current_var) {
2045
fprintf(stderr, "im here rank %d %s %s %s %s:%d\n", ds->my_rank, varname, current_var->varname,current_var->varpath,__FILE__,__LINE__);
2046
if(!compare_var_name(varname, current_var)) {
2047
fprintf(stderr, "im here rank %d %s %s %s %s:%d\n", ds->my_rank, varname, current_var->varname,current_var->varpath,__FILE__,__LINE__);
2050
if(!current_var->ndims) { // scalar
2051
fprintf(stderr, "im here rank %d %s %s %s %s:%d\n", ds->my_rank, varname, current_var->varname,current_var->varpath,__FILE__,__LINE__);
2052
// TODO: check time dimension if there is any
2053
if(current_var->time_dim != -1 &&
2054
(gp->fp->tidx_start != start[0] || count[0] > 1)) {
2055
// TODO: check time dimension if there is any
2056
adios_error(err_no_data_at_timestep, "Specified time step is not available.");
2060
total_size = current_var->data_size;
2061
memcpy(data, current_var->chunks->data, total_size);
2065
fprintf(stderr, "im here rank %d %s:%d\n", ds->my_rank, __FILE__,__LINE__);
2066
// TODO: check time dimension if there is any
2067
if(current_var->time_dim != -1) {
2068
uint64_t ti = current_var->time_dim;
2069
if(futils_is_called_from_fortran()) {
2072
// TODO: in Fortran index starts from 1 but in C index starts from 0
2073
if(count[ti] > 1 || start[ti] != current_var->chunks->global_offsets[ti]) {
2074
adios_error(err_no_data_at_timestep, "Specified time step is not available.");
2079
fprintf(stderr, "im here rank %d %s:%d\n", ds->my_rank, __FILE__,__LINE__);
2080
total_size = read_array(ds, current_var, start, count, data);
2081
fprintf(stderr, "im here rank %d %s:%d\n", ds->my_rank, __FILE__,__LINE__);
2083
// rank 0 should be responsible for send data out to peer readers
2084
if(ds->comm_size > 1) {
2085
if(ds->my_rank == 0) {
2086
// check if this array is missing on peer readers
2087
//if(current_var->num_chunks == 1) {
2088
int byte_pos = current_var->id / 8;
2089
int bit_pos = current_var->id % 8;
2090
unsigned char mask = 0x01 << bit_pos;
2091
//if(current_var->num_chunks == 1) {
2092
if((ds->var_bitmap[byte_pos] & mask) != 0x00) {
2093
fprintf(stderr, "im here rank %d %s:%d\n", ds->my_rank, __FILE__,__LINE__);
2094
int rc = MPI_Bcast(data, total_size, MPI_BYTE, 0, ds->comm);
2095
if(rc != MPI_SUCCESS) {
2096
fprintf(stderr, "rank %d: MPI_Bcast() returns error (%d). %s:%d\n",
2097
ds->my_rank, rc, __FILE__, __LINE__);
2107
current_var = current_var->next;
2108
fprintf(stderr, "im here rank %d %s:%d\n", ds->my_rank, __FILE__,__LINE__);
2111
fprintf(stderr, "im here rank %d %s:%d\n", ds->my_rank, __FILE__,__LINE__);
2114
current_var = ds->vars_peer;
2115
while(current_var) {
2116
if(!compare_var_name(varname, current_var)) {
2117
// found it on remote peer reader
2118
if(!current_var->ndims) { // scalar
2119
// TODO: check time dimension if there is any
2120
if(current_var->time_dim != -1 &&
2121
(gp->fp->tidx_start != start[0] || count[0] > 1)) {
2122
adios_error(err_no_data_at_timestep, "Specified time step is not available.");
2126
total_size = current_var->data_size;
2127
memcpy(data, current_var->chunks->data, total_size);
2131
// TODO: check time dimension if there is any
2132
if(current_var->time_dim != -1) {
2133
uint64_t ti = current_var->time_dim;
2134
// TODO: in Fortran index starts from 1 but in C index starts from 0
2135
if(futils_is_called_from_fortran()) {
2139
if(count[ti] > 1 || start[ti] != current_var->chunks->global_offsets[ti]) {
2140
adios_error(err_no_data_at_timestep, "Specified time step is not available.");
2145
if(ds->my_rank != 0) {
2147
total_size = common_read_type_size(current_var->type, NULL);
2148
for(; i < current_var->ndims; i ++) {
2149
total_size *= count[i];
2152
if(ds->comm_size > 1) {
2153
fprintf(stderr, "im here rank %d %s %ld %s:%d\n", ds->my_rank, current_var->varname,total_size, __FILE__,__LINE__);
2154
// check if this array is missing on peer readers
2155
int rc = MPI_Bcast(data, total_size, MPI_BYTE, 0, ds->comm);
2156
fprintf(stderr, "im here rank %d %s %ld %s:%d\n", ds->my_rank, current_var->varname,total_size, __FILE__,__LINE__);
2157
if(rc != MPI_SUCCESS) {
2158
fprintf(stderr, "rank %d: MPI_Bcast() returns error (%d). %s:%d\n",
2159
ds->my_rank, rc, __FILE__, __LINE__);
2162
fprintf(stderr, "im here rank %d %s %ld %s:%d\n", ds->my_rank, current_var->varname,total_size, __FILE__,__LINE__);
2166
fprintf(stderr, "im here read rank %d var %s addr %p total size %ld %s:%d\n", ds->my_rank, varname, data,total_size,__FILE__,__LINE__);
2171
current_var = current_var->next;
2176
fprintf(stderr, "im here rank %d %s:%d\n", ds->my_rank, __FILE__,__LINE__);
2177
adios_error(err_invalid_varname, "Cannot find var %s\n", varname);
2183
// TODO: search through all pgs to find this var
2185
for(p = 0; p < ds->num_pgs; p ++) {
2186
datatap_pg_info *current_pg = &(ds->pgs[p]);
2187
datatap_var_info *var_info = NULL;
2189
for(v = 0; v < current_pg->num_vars; v ++) {
2190
if(!compare_var_name(varname, &(current_pg->vars[v]))) {
2194
if(!current_pg->vars[v].ndims) { // scalar
2195
// TODO: check time dimension if there is any
2196
if(current_pg->vars[v].time_dim != -1 &&
2197
(gp->fp->tidx_start != start[0] || count[0] > 1)) {
2198
adios_error(err_no_data_at_timestep, "Specified time step is not available.");
2202
total_size = common_read_type_size(current_pg->vars[v].type, current_pg->vars[v].data);
2203
memcpy(data, current_pg->vars[v].data, total_size);
2207
// TODO: check time dimension if there is any
2208
if(current_pg->vars[v].time_dim != -1) {
2209
// TODO: in Fortran index starts from 1 but in C index starts from 0
2210
uint64_t ti = current_pg->vars[v].time_dim;
2211
if(count[ti] > 1 || start[ti] != current_pg->vars[v].global_offsets[ti]) {
2212
adios_error(err_no_data_at_timestep, "Specified time step is not available.");
2217
// Datatap batches per-variable reads, so here we only record the buffer address
2218
datatap_var_info *var_info = (datatap_var_info *) malloc(sizeof(datatap_var_info));
2220
adios_error(err_no_memory, "Could not allocate memory for group info");
2223
memcpy(var_info, &(current_pg->vars[v]), sizeof(datatap_var_info));
2224
var_info->local_bounds = (uint64_t *) malloc(var_info->ndims * sizeof(uint64_t));
2225
if(!var_info->local_bounds) {
2226
adios_error(err_no_memory, "Cannot allocate memory for Datatap.");
2229
var_info->global_bounds = (uint64_t *) malloc(var_info->ndims * sizeof(uint64_t));
2230
if(!var_info->global_bounds) {
2231
adios_error(err_no_memory, "Cannot allocate memory for Datatap.");
2234
var_info->global_offsets = (uint64_t *) malloc(var_info->ndims * sizeof(uint64_t));
2235
if(!var_info->global_offsets) {
2236
adios_error(err_no_memory, "Cannot allocate memory for Datatap.");
2242
for(i = 0; i < var_info->ndims; i ++) {
2243
var_info->local_bounds[i] = count[i];
2244
// TODO: it seems that user won't read from N-dimensional array into M-dimension
2245
var_info->global_bounds[i] = current_pg->vars[v].global_bounds[i];
2246
var_info->global_offsets[i] = start[i];
2247
total_size = total_size * count[i];
2249
total_size *= common_read_type_size(var_info->type, NULL);
2250
var_info->data = data;
2251
var_info->data_size = total_size;
2253
// now we add this to the list of vars to read
2254
if(!ds->vars_read) {
2255
ds->vars_read = var_info;
2256
ds->vars_read_tail = var_info;
2257
var_info->next = NULL;
2260
ds->vars_read_tail->next = var_info;
2261
ds->vars_read_tail = var_info;
2262
var_info->next = NULL;
2264
ds->num_vars_read ++;
2270
fprintf(stderr, "im here read var %s addr %p %s:%d\n", varname, data,__FILE__,__LINE__);
2276
adios_error(err_invalid_varname, "Cannot find var %s\n", varname);
2282
int64_t adios_read_datatap_read_var_byid (ADIOS_GROUP *gp, int varid,
2283
const uint64_t *start,
2284
const uint64_t *count,
2287
if(varid >= 0 && varid < gp->vars_count) {
2288
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
2289
return adios_read_datatap_read_var (gp, gp->var_namelist[varid], start, count, data);
2292
adios_error(err_invalid_varid, "Cannot find var %d\n", varid);
2298
void adios_read_datatap_reset_dimension_order (ADIOS_FILE *fp, int is_fortran)
2301
adios_error(err_invalid_read_method, "adios_read_datatap_reset_dimension_order is not implemented.");