~ubuntu-branches/ubuntu/utopic/adios/utopic

« back to all changes in this revision

Viewing changes to src/read_datatap.c

  • Committer: Package Import Robot
  • Author(s): Alastair McKinstry
  • Date: 2013-12-09 15:21:31 UTC
  • mfrom: (1.1.2)
  • Revision ID: package-import@ubuntu.com-20131209152131-jtd4fpmdv3xnunnm
Tags: 1.5.0-1
* New upstream.
* Standards-Version: 3.9.5
* Include latest config.{sub,guess} 
* New watch file.
* Create libadios-bin for binaries.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
#include "config.h"
2
 
 
3
 
#if NO_DATATAP == 0
4
 
 
5
 
#include <stdio.h>
6
 
#include <stdlib.h>
7
 
#include <string.h>
8
 
#include <unistd.h>
9
 
#include <ffs.h>
10
 
#include <atl.h>
11
 
#include <evpath.h>
12
 
//#include <mpi.h>
13
 
 
14
 
#ifdef _NOMPI
15
 
    /* Sequential processes can use the library compiled with -D_NOMPI */
16
 
#   include "mpidummy.h"
17
 
#define MPI_SUM 0
18
 
#else
19
 
    /* Parallel applications should use MPI to communicate file info and slices of data */
20
 
#   include "mpi.h"
21
 
#endif
22
 
 
23
 
#include <pthread.h>
24
 
#include "adios.h"
25
 
#include "adios_read.h"
26
 
#include "adios_read_hooks.h"
27
 
#include "adios_error.h"
28
 
#include "globals.h"
29
 
 
30
 
#include <sys/queue.h>
31
 
#if HAVE_PORTALS == 1
32
 
#include <thin_portal.h>
33
 
#elif HAVE_INFINIBAND == 1
34
 
#include <thin_ib.h>
35
 
#endif
36
 
 
37
 
#include <sys/socket.h>
38
 
#include <sys/times.h>
39
 
#include <netinet/in.h>
40
 
#include <sys/time.h>
41
 
#include <sys/uio.h>
42
 
#include <sys/types.h>
43
 
#include <sys/stat.h>
44
 
#include <fcntl.h>
45
 
#include <ctype.h>
46
 
 
47
 
#include <gen_thread.h>
48
 
//#include "queue.h"
49
 
//#include "get_clock.h"
50
 
//#include "attributes.h"
51
 
//#include "memwatch.h"
52
 
#ifdef DMALLOC
53
 
#include "dmalloc.h"
54
 
#endif
55
 
 
56
 
 
57
 
#define DT_MAX_QUEUE_LENGTH 512 
58
 
 
59
 
typedef struct _Queue_Item
60
 
{
61
 
    uint32_t length;
62
 
    char * data;
63
 
    FMStructDescList var_list;
64
 
    int32_t rank;     // rank of client from which the chunk comes
65
 
} QueueItem;
66
 
 
67
 
typedef struct _datatap_var_chunk
68
 
{
69
 
    int rank;
70
 
    void *data;
71
 
    uint64_t *local_bounds; // ndims
72
 
    uint64_t *global_bounds; // ndims
73
 
    uint64_t *global_offsets; // ndims
74
 
    struct _datata_var_chunk *next;
75
 
} datatap_var_chunk, *datatap_var_chunk_p;
76
 
 
77
 
typedef struct _datatap_var_info
78
 
{
79
 
    int id;
80
 
    char *varname;
81
 
    char *varpath;
82
 
    enum ADIOS_DATATYPES type;
83
 
    uint64_t data_size;   
84
 
    int time_dim; // -1 means no time dimension
85
 
    int ndims;
86
 
    int num_chunks;
87
 
    datatap_var_chunk *chunks;
88
 
    struct _datatap_var_info *next;
89
 
} datatap_var_info, *datatap_var_info_p;
90
 
 
91
 
 
92
 
//typedef struct _datatap_pg_info
93
 
//{
94
 
//    int rank;    
95
 
//    int num_vars;    
96
 
//    datatap_var_info *vars;    
97
 
//} datatap_pg_info, *datatap_pg_info_p;
98
 
 
99
 
#define VAR_BITMAP_SIZE 16
100
 
 
101
 
// TODO
102
 
typedef struct _datatap_read_file_data
103
 
{
104
 
    char *file_name;
105
 
    char *group_name; // TODO: assume one group in file
106
 
    file_info *f_info;
107
 
//    int timestep;    // TODO: it's already in file_info
108
 
    int num_vars;
109
 
    datatap_var_info *vars;
110
 
//    FMStructDescList var_list; // TODO
111
 
 
112
 
    // TODO: replicated meta-data info from peer readers
113
 
    // it is a list of vars which is not present locally
114
 
    int num_vars_peer;
115
 
    datatap_var_info *vars_peer;
116
 
 
117
 
    MPI_Comm comm;
118
 
    int my_rank;
119
 
    int comm_size;
120
 
 
121
 
    int host_language_fortran; // 1 for Fortran; 0 for C
122
 
 
123
 
    char var_bitmap[VAR_BITMAP_SIZE]; // 128 bit for 128 var
124
 
 
125
 
//    int num_vars_read;    
126
 
//    datatap_var_info *vars_read;    
127
 
//    datatap_var_info *vars_read_tail;    
128
 
} datatap_read_file_data, *datatap_read_file_data_p;
129
 
 
130
 
typedef struct _datatap_read_method_data
131
 
{
132
 
    pthread_t dt_server_thread;
133
 
//    Queue *dt_queue;
134
 
//    uint32_t dt_queue_max_length;
135
 
//    pthread_mutex_t mutex;
136
 
//    pthread_cond_t cond1;
137
 
//    pthread_cond_t cond2;
138
 
    MPI_Comm dt_comm;
139
 
    int dt_comm_rank;
140
 
    int dt_comm_size;
141
 
    CManager dt_cm;
142
 
    IOhandle *dt_handle;
143
 
    int dt_server_ready; 
144
 
//    int dt_server_stop; 
145
 
    int num_io_dumps;    // TODO: timestep
146
 
} datatap_read_method_data, *datatap_read_method_data_p;
147
 
 
148
 
 
149
 
// this sructure holds all global data for Datatap Read method
150
 
datatap_read_method_data *dt_read_data = NULL;
151
 
 
152
 
// compare used-providd varname with the full path name of variable v
153
 
// return zero if matches and non-zero otherwise
154
 
static int compare_var_name (char *varname, datatap_var_info *v) 
155
 
{
156
 
    if (varname[0] == '/') { // varname is full path
157
 
        char fullpath[256];
158
 
        if(!strcmp(v->varpath, "/")) {
159
 
            sprintf(fullpath, "/%s\0", v->varname);  
160
 
        } 
161
 
        else {   
162
 
            sprintf(fullpath, "%s/%s\0", v->varpath, v->varname);  
163
 
        }
164
 
        return strcmp(fullpath, varname);
165
 
    }
166
 
    else { // varname doesn't include path
167
 
        return strcmp(v->varname, varname);
168
 
    }
169
 
}
170
 
 
171
 
static FMField *find_field (char *name, char *path, FMFieldList flist)
172
 
{
173
 
    char *temp_name;
174
 
    char *full_path_name = get_full_path_name(name, path);
175
 
    temp_name = getFixedName(full_path_name);
176
 
    free(full_path_name);
177
 
    FMField *f = flist;
178
 
    while (f->field_name != NULL) {
179
 
        if(!strcmp(temp_name, f->field_name)) {
180
 
            free(temp_name);
181
 
            return f;
182
 
        }
183
 
        else {
184
 
            f++;
185
 
        }
186
 
#if 0
187
 
        char *name_pos = f->field_name + (strlen(f->field_name) - strlen(name));
188
 
        if(!strncmp(path, f->field_name, strlen(path)) &&
189
 
           !strcmp(name, name_pos)) {
190
 
            return f;
191
 
        }
192
 
        else {
193
 
            f++;
194
 
        }
195
 
#endif
196
 
    }
197
 
    free(temp_name);
198
 
    return f;
199
 
}
200
 
 
201
 
int64_t read_array(datatap_read_file_data *ds, datatap_var_info *var, 
202
 
                    uint64_t *start, uint64_t *count, void *data)
203
 
{
204
 
    int type_size = common_read_type_size(var->type, NULL);
205
 
fprintf(stderr, "im here type_size %d %s:%d\n", type_size, __FILE__,__LINE__);
206
 
    int64_t total_size = 0;
207
 
 
208
 
    // go over the whole list of data chunks and reorganize arrays
209
 
    Queue *data_q = ds->f_info->dt_queue;
210
 
 
211
 
    // TODO: we should be able to access the queue since the dt server will not
212
 
    // touch it any more
213
 
    ListElmt *current_chunk = list_head(data_q);
214
 
    while(current_chunk) {
215
 
        QueueItem *qi = (QueueItem *)current_chunk->data;  
216
 
        FMFieldList filed_list = qi->var_list->field_list; // TODO
217
 
        
218
 
        // first, find the var   
219
 
        FMField *f = find_field(var->varname, var->varpath, filed_list);
220
 
 
221
 
        if(!f) { 
222
 
            // actually this will not happen because the filed_list will
223
 
            // contain all vars even though some may not be written
224
 
            current_chunk = current_chunk->next;
225
 
            continue;
226
 
        }
227
 
 
228
 
        char *source_addr = (char *)qi->data + f->field_offset; // this is the address
229
 
        void *source_addr2;
230
 
  
231
 
        // here we need to distinguish static and dynamic arrays
232
 
        int dim_are_nums = 1;
233
 
 
234
 
        {
235
 
            char *dim_start = strchr(f->field_type, '[');
236
 
            char *dim_end = strrchr(f->field_type, ']');
237
 
            while(dim_start < dim_end) {
238
 
                if(*dim_start == '[' || *dim_start == ']') {
239
 
                    dim_start ++;
240
 
                    continue;
241
 
                } else if(!isdigit(*dim_start)) {
242
 
                    dim_are_nums = 0;
243
 
                    break;
244
 
                }
245
 
                else {
246
 
                    dim_start ++;
247
 
                    continue;
248
 
                }
249
 
            }
250
 
        }
251
 
 
252
 
        if(dim_are_nums) {
253
 
            source_addr2 = source_addr;
254
 
        } else {
255
 
            switch(var->type) {
256
 
                case adios_byte:
257
 
                case adios_unsigned_byte:
258
 
                    source_addr2 = *((char **)source_addr);
259
 
                    break;
260
 
 
261
 
                case adios_string: // TODO
262
 
                    return strlen((char *)source_addr) + 1;
263
 
                    //source_addr2 = *((char **)source_addr);
264
 
                    //break;
265
 
 
266
 
                case adios_short:
267
 
                case adios_unsigned_short:
268
 
                    source_addr2 = *((short int **)source_addr);
269
 
                    break;
270
 
 
271
 
                case adios_integer:
272
 
                case adios_unsigned_integer:
273
 
                    source_addr2 = *((int **)source_addr);
274
 
                    break;
275
 
 
276
 
                case adios_long:
277
 
                case adios_unsigned_long:
278
 
                    source_addr2 = *((long int **)source_addr);
279
 
                    break;
280
 
 
281
 
                case adios_real:
282
 
                    source_addr2 = *((float **)source_addr);
283
 
                    break;
284
 
 
285
 
                case adios_double:
286
 
                    source_addr2 = *((double **)source_addr);
287
 
                    break;
288
 
 
289
 
                case adios_long_double:
290
 
                    source_addr2 = *((long double **)source_addr);
291
 
                    break;
292
 
 
293
 
                case adios_complex:
294
 
                case adios_double_complex:
295
 
                default:
296
 
                    adios_error(err_invalid_read_method, "complex data type is not supported.");
297
 
                    return -1;
298
 
            }
299
 
        }
300
 
 
301
 
        // find the array dimension info 
302
 
        int j;
303
 
        datatap_var_chunk *chunk = var->chunks; 
304
 
        while(chunk) {
305
 
            if(chunk->rank == qi->rank) {
306
 
                break;
307
 
            } 
308
 
            else {
309
 
                chunk = chunk->next;
310
 
            }
311
 
        }
312
 
 
313
 
        if(!chunk) { // this chunk doesn't contain the var so skip it
314
 
            current_chunk = current_chunk->next;
315
 
            continue;
316
 
        }
317
 
 
318
 
        // now we copy data into user's read buffer
319
 
        uint64_t global_start_r = 0;
320
 
        for(j = var->ndims-1; j >= 0; j --) {
321
 
            global_start_r *= chunk->global_bounds[j];
322
 
            global_start_r += start[j];
323
 
        }
324
 
 
325
 
 
326
 
        total_size = type_size;
327
 
        for(j = var->ndims-1; j >= 0; j --) {
328
 
            total_size *= count[j];
329
 
        }
330
 
 
331
 
        for(j = 0; j < var->ndims; j ++) {
332
 
            int lower = chunk->global_offsets[j] - start[j];
333
 
            int higher = (start[j]+count[j]) - (chunk->global_offsets[j]+chunk->local_bounds[j]);
334
 
            if(lower < 0 || higher < 0) {
335
 
                // this means this chunk has some data which is not needed by this reader
336
 
                // TODO: for pixie3d case, this will not happen
337
 
                adios_error(err_invalid_read_method, "Datatap cannot support this decomposition.\n");
338
 
                return -1;
339
 
            }
340
 
        }
341
 
 
342
 
        // find the smallest slice
343
 
        int min_dim = 0;
344
 
        uint64_t stride_size = type_size;
345
 
        for(j = 0; j < var->ndims; j ++) {
346
 
            int lower = chunk->global_offsets[j] - start[j];
347
 
            int higher = (start[j]+count[j]) - (chunk->global_offsets[j]+chunk->local_bounds[j]);
348
 
            if(lower == 0 && higher == 0) {
349
 
                // this means this dimnesion fits, let's move on to the next dimension
350
 
                stride_size *= chunk->local_bounds[j];
351
 
                min_dim ++; 
352
 
                continue;
353
 
            }
354
 
            else if((lower > 0 && higher >= 0) ||
355
 
                    (lower >= 0 && higher > 0)) {
356
 
                // this means this dimension is strided. this dimension is the highest possible 
357
 
                // dimension
358
 
                break;
359
 
            }
360
 
        }
361
 
 
362
 
        uint64_t num_strides = 1;
363
 
        for(j = min_dim; j < var->ndims; j ++) {
364
 
            num_strides *= chunk->local_bounds[j];
365
 
        }
366
 
 
367
 
        uint64_t current_pos[var->ndims]; // track current stride's starting global offset
368
 
        for(j = 0; j < var->ndims; j ++) {
369
 
            current_pos[j] = chunk->global_offsets[j];
370
 
        }
371
 
        uint64_t k = 0;       
372
 
        char *stride_start_addr = source_addr2; 
373
 
        int should_stop = 0;
374
 
        for(; k < num_strides; k ++) {
375
 
            // calculate the coordinates within the bounding box
376
 
            uint64_t my_start = 0;
377
 
            for(j = var->ndims-1; j >= 0; j --) {
378
 
                my_start *= count[j];
379
 
                my_start += (current_pos[j] - chunk->global_offsets[j]);
380
 
            } 
381
 
            char *position = (char *)data + my_start * type_size;
382
 
 
383
 
            memcpy(position, stride_start_addr, stride_size);
384
 
 
385
 
            // now advance to copy next stride
386
 
            stride_start_addr += stride_size; 
387
 
            should_stop = 0;
388
 
            for(j = min_dim; j < var->ndims; j ++) {
389
 
                if(!should_stop) {
390
 
                    if(current_pos[j] == chunk->global_offsets[j]+chunk->local_bounds[j]-1) {
391
 
                        // don't set should_stop so we move on to next dimension
392
 
                        current_pos[j] = chunk->global_offsets[j];
393
 
                    }
394
 
                    else {
395
 
                        current_pos[j] ++;
396
 
                        should_stop = 1;
397
 
                    }
398
 
                } 
399
 
                else {
400
 
                    break;
401
 
                }
402
 
            }
403
 
        }  
404
 
 
405
 
#if 0
406
 
        for(j = var->ndims-1; j >= 0; j --) {
407
 
            // for each sub-chunk, test if it falls into the reading region
408
 
            int lower = (chunk->global_offsets[j] >= start[j]) ? 1: 0;
409
 
            int higher = ((chunk->global_offsets[j]+chunk->local_bounds[j]) <=
410
 
                 (start[j]+count[j])) ? 1: 0;
411
 
            if(lower && higher) {
412
 
                if(j == 0) { // copy the whole chunks
413
 
                    uint64_t my_start = 0;
414
 
                    uint64_t my_size = type_size;
415
 
                    int k;
416
 
                    for(k = var->ndims-1; k >= 0; k --) {
417
 
                        my_start *= chunk->global_bounds[k];
418
 
                        my_start += chunk->global_offsets[k];
419
 
                        my_size *= chunk->local_bounds[k];
420
 
                    }
421
 
                    
422
 
                    int y, z;
423
 
                    for(z = 0; z < ; z ++) {
424
 
                        uint64_t z_offset = pix_record->zoffset + k;
425
 
                        for(y = 0; y < pix_record->ysize; y ++) {
426
 
                            unsigned long long y_offset = pix_record->yoffset + j;
427
 
                            unsigned long long position = z_offset * pix_record->nyd_plus_2 * pix_record->nxd_plus_2 +
428
 
                               y_offset * pix_record->nxd_plus_2 + pix_record->xoffset;
429
 
                            position -= p_data->start_pos;
430
 
                            memcpy(&(p_data->buffers[m].buffer[position]), v, pix_record->xsize * sizeof(double));
431
 
                        }
432
 
                    }
433
 
 
434
 
 
435
 
                    char *position = (char *)data + (my_start - global_start_r) * type_size;
436
 
                    memcpy(position, source_addr2, my_size);
437
 
 
438
 
 
439
 
fprintf(stderr, "%d rank=%d name = %s position=%p data=%p mystart = %lu global_start_r = %lu mysize = %lu %s:%d\n", ds->my_rank,chunk->rank,var->varname,position,data,my_start, global_start_r, my_size, __FILE__,__LINE__);
440
 
                    total_size += my_size;
441
 
fprintf(stderr, "im here %d %s:%d\n", ds->my_rank,__FILE__,__LINE__);
442
 
                    break;
443
 
                }
444
 
                else {
445
 
fprintf(stderr, "im here %s:%d\n", __FILE__,__LINE__);
446
 
                    // TODO
447
 
                    continue;
448
 
                }
449
 
            }
450
 
            else {
451
 
                // TODO
452
 
                adios_error(err_invalid_read_method, "Datatap cannot support this decomposition.\n");
453
 
                return -1;
454
 
            }
455
 
        }
456
 
        // we are done with this chunk, now extract data from next chunk
457
 
#endif
458
 
        
459
 
 
460
 
        current_chunk = current_chunk->next;
461
 
    }
462
 
fprintf(stderr, "im here read rank %d var %s total size %ld %s:%d\n", ds->my_rank, var->varname, total_size,__FILE__,__LINE__);
463
 
 
464
 
    // TODO: the data size is messed up because of ghost zone
465
 
    total_size = type_size;
466
 
    int j;
467
 
    for(j = var->ndims-1; j >= 0; j --) {
468
 
        total_size *= count[j]; 
469
 
    }
470
 
    return total_size;       
471
 
}
472
 
 
473
 
#if 0
474
 
int reorganize_array (QueueItem *qi, datatap_read_file_data *ds)
475
 
{
476
 
fprintf(stderr, "im here rank %d %s:%d\n",dt_read_data->dt_comm_rank,__FILE__,__LINE__);
477
 
    // go through all variables user wants to read
478
 
    datatap_pg_info *pg = NULL;
479
 
    int i;
480
 
    for(i = 0; i < ds->num_pgs; i ++) {
481
 
        if(ds->pgs[i].rank == qi->rank) {
482
 
            pg = &(ds->pgs[i]);                   
483
 
            break;
484
 
        }
485
 
    }
486
 
    if(!pg) {
487
 
        adios_error(err_unspecified, "cannot find pg.\n");
488
 
        return -1;        
489
 
    }
490
 
    
491
 
fprintf(stderr, "im here rank %d %s:%d\n",dt_read_data->dt_comm_rank,__FILE__,__LINE__);
492
 
    datatap_var_info *var = ds->vars_read;
493
 
    FMFieldList filed_list = qi->var_list->field_list; // TODO
494
 
    while(var) {
495
 
        // first search within the pg for this var        
496
 
fprintf(stderr, "im here rank %d var name %s %s:%d\n",dt_read_data->dt_comm_rank,var->varname,__FILE__,__LINE__);
497
 
        datatap_var_info *v = NULL;
498
 
        for(i = 0; i < pg->num_vars; i ++) {
499
 
fprintf(stderr, "im here rank %d var name %s i %d %s:%d\n",dt_read_data->dt_comm_rank,var->varname,i,__FILE__,__LINE__);
500
 
            //if(!strcmp(pg->vars[i].varname, var->varname)) {
501
 
            if(!compare_var_name(var->varname, &(pg->vars[i]))) {
502
 
fprintf(stderr, "im here rank %d var name %s i %d %s:%d\n",dt_read_data->dt_comm_rank,var->varname,i,__FILE__,__LINE__);
503
 
                // got it
504
 
                v = &(pg->vars[i]);
505
 
fprintf(stderr, "im here rank %d var name %s i %d %s:%d\n",dt_read_data->dt_comm_rank,var->varname,i,__FILE__,__LINE__);
506
 
                 
507
 
                int type_size = common_read_type_size(v->type, NULL); 
508
 
fprintf(stderr, "im here rank %d var name %s i %d %s:%d\n",dt_read_data->dt_comm_rank,var->varname,i,__FILE__,__LINE__);
509
 
                FMField *f = find_field(v->varname, v->varpath, filed_list);
510
 
fprintf(stderr, "im here rank %d var name %s i %d %s:%d\n",dt_read_data->dt_comm_rank,var->varname,i,__FILE__,__LINE__);
511
 
                char *source_addr = (char *)qi->data + f->field_offset; // this is the address
512
 
                void *source_addr2;
513
 
fprintf(stderr, "im here rank %d var name %s i %d %s:%d\n",dt_read_data->dt_comm_rank,var->varname,i,__FILE__,__LINE__);
514
 
                switch(v->type) {
515
 
                    case adios_byte:
516
 
                    case adios_unsigned_byte:
517
 
                        source_addr2 = *((char **)source_addr);
518
 
                        break;
519
 
          
520
 
                    case adios_string: // TODO
521
 
                        return strlen((char *)source_addr) + 1;
522
 
                        source_addr2 = *((char **)source_addr);
523
 
                        break;
524
 
         
525
 
                    case adios_short:
526
 
                    case adios_unsigned_short:
527
 
                        source_addr2 = *((short int **)source_addr);
528
 
                        break;
529
 
        
530
 
                    case adios_integer:
531
 
                    case adios_unsigned_integer:
532
 
                        source_addr2 = *((int **)source_addr);
533
 
                        break;
534
 
        
535
 
                    case adios_long:
536
 
                    case adios_unsigned_long:
537
 
                        source_addr2 = *((long int **)source_addr);
538
 
                        break;
539
 
        
540
 
                    case adios_real:
541
 
                        source_addr2 = *((float **)source_addr);
542
 
                        break;
543
 
        
544
 
                    case adios_double:
545
 
                        source_addr2 = *((double **)source_addr);
546
 
                        break;
547
 
        
548
 
                    case adios_long_double:
549
 
                        source_addr2 = *((long double **)source_addr);
550
 
                        break;
551
 
        
552
 
                    case adios_complex:
553
 
                    case adios_double_complex:
554
 
                    default:
555
 
                        adios_error(err_invalid_read_method, "complex data type is not supported.");
556
 
                        return -1;
557
 
                }
558
 
                //memcopy(var->read_buffer, source_addr2, var->read_buffer_size);
559
 
                                      
560
 
                // now we need to calculate how to map chunks into read buffer
561
 
                // according to dimension specification
562
 
                // also we need to figure out which part should be shuffled to
563
 
                // other peer processes and which part should be moved in from
564
 
                // other processes
565
 
                // search along the slowest changing dimension and determine if 
566
 
                // the sub-chunk falls into the reading region. If so, copy the 
567
 
                // whole sub-chunk, otherwise search along the second slowest
568
 
                // changing dimension within that subchunk, identify which part
569
 
                // should go where and copy the remaining part to local buffer
570
 
fprintf(stderr, "im here rank %d var name %s i %d %s:%d\n",dt_read_data->dt_comm_rank,var->varname,i,__FILE__,__LINE__);
571
 
                int j;
572
 
                uint64_t global_start_r = 0; 
573
 
                for(j = var->ndims-1; j >= 0; j --) {
574
 
                    if(var->global_bounds[j]) {
575
 
                        global_start_r *= var->global_bounds[j];
576
 
                        global_start_r += var->global_offsets[j];
577
 
                    }
578
 
                }
579
 
                
580
 
if(!strcmp(v->varname,"bconds")){
581
 
    int *bconds = (int *)source_addr2;
582
 
    int t;
583
 
    for(t=0;t<48;t++) {
584
 
        fprintf(stderr, "bconds[%d]=%d\n",t,bconds[t]);
585
 
    }
586
 
 
587
 
}
588
 
 
589
 
if(!strcmp(v->varname,"v1")){
590
 
    double *v1 = (double *)source_addr2;
591
 
    int t;
592
 
    for(t=0;t<100;t++) {
593
 
        fprintf(stderr, "v1[%d]=%f\n",t,v1[t]);
594
 
    }
595
 
 
596
 
}
597
 
 
598
 
fprintf(stderr, "im here rank %d var name %s i %d %s:%d\n",dt_read_data->dt_comm_rank,var->varname,i,__FILE__,__LINE__);
599
 
                for(j = v->ndims-1; j >= 0; j --) {
600
 
                    // for each sub-chunk, test if it falls into the reading region
601
 
                    int lower = (v->global_offsets[j] >= var->global_offsets[j]) ? 1: 0;
602
 
                    int higher = ((v->global_offsets[j]+v->local_bounds[j]) <= 
603
 
                         (var->global_offsets[j]+var->local_bounds[j])) ? 1: 0; 
604
 
fprintf(stderr, "im here rank=%d j %d name %s go %lu lb %lu go %lu lb %lu %s:%d\n", dt_read_data->dt_comm_rank,j,var->varname,v->global_offsets[j],v->local_bounds[j],var->global_offsets[j],var->local_bounds[j],__FILE__,__LINE__);
605
 
                    if(lower && higher) { 
606
 
fprintf(stderr, "im here j %d %s:%d\n", j,__FILE__,__LINE__);
607
 
                        if(j == 0) { // copy the whole chunks
608
 
                            uint64_t my_start = 0;
609
 
                            uint64_t my_size = type_size;
610
 
                            int k;
611
 
                            for(k = v->ndims-1; k >= 0; k --) {
612
 
                                my_start *= v->global_bounds[k];
613
 
                                my_start += v->global_offsets[k];
614
 
                                my_size *= v->local_bounds[k];  
615
 
                            }
616
 
                            char *position = (char *)var->data + (my_start - global_start_r) * type_size;                            
617
 
fprintf(stderr, "rank=%d name = %s position=%lu data=%lu mystart = %lu global_start_r = %lu mysize = %lu %s:%d\n", dt_read_data->dt_comm_rank,var->varname,position,var->data,my_start, global_start_r, my_size, __FILE__,__LINE__);
618
 
                            memcpy(position, source_addr2, my_size);      
619
 
fprintf(stderr, "im here j %d %s:%d\n", j,__FILE__,__LINE__);
620
 
                            break;
621
 
                        }
622
 
                        else {
623
 
                            // TODO 
624
 
                            continue; 
625
 
                        }
626
 
                    }
627
 
                    else {
628
 
fprintf(stderr, "im here rank=%d j %d name %s go %lu lb %lu go %lu lb %lu %s:%d\n", dt_read_data->dt_comm_rank,j,var->varname,v->global_offsets[j],v->local_bounds[j],var->global_offsets[j],var->local_bounds[j],__FILE__,__LINE__);
629
 
fprintf(stderr, "im here j %d %s:%d\n", j,__FILE__,__LINE__);
630
 
                        // TODO
631
 
                        adios_error(err_invalid_read_method, "Datatap cannot support this decomposition.\n");
632
 
                        return -1;
633
 
                    } 
634
 
fprintf(stderr, "im here j %d %s:%d\n", j,__FILE__,__LINE__);
635
 
                }
636
 
                break;
637
 
                // end of data re-organization for this var
638
 
            } 
639
 
        }             
640
 
        
641
 
        // now process the next var to read        
642
 
        var = var->next;
643
 
    }    
644
 
fprintf(stderr, "im here rank %d %s:%d\n",dt_read_data->dt_comm_rank,__FILE__,__LINE__);
645
 
 
646
 
    return 0;
647
 
}
648
 
#endif
649
 
 
650
 
void data_handler (void *data, int length, void *user_data, attr_list attr, int rank, void *timing, file_info *f)
651
 
{
652
 
    recvtime *r = (recvtime*)timing;
653
 
    IOhandle *h = (IOhandle*)user_data;
654
 
    elapsedtime *e = updateTimes(h, r, length);
655
 
    
656
 
    // decode the data and insert the data into the queue
657
 
    int decoded_length = FFS_est_decode_length(h->iocontext, data, length);
658
 
 
659
 
    // TODO: make sure we free this later (after writing to hdf5 file)
660
 
    char *decoded_data = (char *)malloc(decoded_length);
661
 
 
662
 
    if(!decoded_data) {
663
 
        adios_error(err_no_memory, "Cannot allocate memory for Datatap.");
664
 
        exit(-1);
665
 
    }
666
 
 
667
 
    FFSTypeHandle ffshandle = FFSTypeHandle_from_encode(h->iocontext, data);
668
 
    FMFormat form = FMFormat_of_original(ffshandle);
669
 
 
670
 
    // TODO
671
 
    FMStructDescList var_list = get_localized_formats(form);
672
 
    establish_conversion(h->iocontext, ffshandle, var_list);
673
 
    FFSdecode_to_buffer(h->iocontext, data, decoded_data);
674
 
 
675
 
    // The encoded data can be recycled now
676
 
    returnbuffer(h, data, length);
677
 
 
678
 
    QueueItem * qi =(QueueItem *) malloc(sizeof(QueueItem));
679
 
    if(!qi) {
680
 
        adios_error(err_no_memory, "Cannot allocate memory for Datatap.");
681
 
        exit(-1);
682
 
    }
683
 
    qi->data = decoded_data;
684
 
    qi->length = decoded_length;
685
 
    qi->var_list = var_list;
686
 
    qi->rank = rank; // TODO: hack it!   
687
 
 
688
 
    // put message in queue
689
 
    pthread_mutex_lock(&(f->mutex));
690
 
//    while(queue_size(f->dt_queue) >= f->dt_queue_max_length) {
691
 
//        // wait until queue is not full
692
 
//        pthread_cond_signal(&(dt_read_data->cond2));
693
 
//        pthread_cond_wait(&(dt_read_data->cond1), &(dt_read_data->mutex));
694
 
//    }
695
 
    queue_enqueue(f->dt_queue, qi);
696
 
    if(queue_size(f->dt_queue) == f->num_chunks) {
697
 
        pthread_cond_signal(&(f->cond2));
698
 
    }
699
 
    pthread_mutex_unlock(&(f->mutex));
700
 
 
701
 
 
702
 
fprintf(stderr, "im here rank= %d client rank=%d data handler done %s:%d\n",h->rank,rank,__FILE__,__LINE__);
703
 
    
704
 
    // TODO: check if it's time to stop
705
 
}
706
 
 
707
 
void * dt_server_thread_func (void *arg)
708
 
{
709
 
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
710
 
    MPI_Comm orig_comm = (MPI_Comm) arg;
711
 
 
712
 
#if 0 // open-mpi   
713
 
    // duplicate a MPI communicator for synchronization between dt servers
714
 
    int rc = MPI_Comm_dup(orig_comm, &(dt_read_data->dt_comm));
715
 
    if(rc != MPI_SUCCESS) {
716
 
        error(err_unspecified, "Cannot duplicate communicator for Datatap.");
717
 
        pthread_exit(NULL);
718
 
    }
719
 
#else 
720
 
    dt_read_data->dt_comm = orig_comm;
721
 
#endif
722
 
 
723
 
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
724
 
 
725
 
#ifdef _NOMPI
726
 
    dt_read_data->dt_comm_size = 1;
727
 
    dt_read_data->dt_comm_rank = 0;
728
 
#else 
729
 
    MPI_Comm_size(dt_read_data->dt_comm, &(dt_read_data->dt_comm_size));
730
 
    MPI_Comm_rank(dt_read_data->dt_comm, &(dt_read_data->dt_comm_rank));
731
 
#endif
732
 
  
733
 
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
734
 
    // initialize ptlpbio interface
735
 
    dt_read_data->dt_cm = CManager_create();
736
 
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
737
 
    CMlisten_specific(dt_read_data->dt_cm, NULL);
738
 
 
739
 
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
740
 
    lrand48();
741
 
 
742
 
    dt_read_data->dt_handle = EVthin_portals_listen(dt_read_data->dt_cm, 120,
743
 
                              0, data_handler, dt_read_data->dt_comm);
744
 
 
745
 
    char param_file[30];
746
 
    int appid, was_set;
747
 
    appid = globals_adios_get_application_id(&was_set);
748
 
    if(!was_set) {
749
 
        adios_error(err_unspecified, "Application ID was not set.");
750
 
        sprintf(param_file, "datatap_param\0");
751
 
    }
752
 
    else {
753
 
        sprintf(param_file, "datatap_param%d\0", appid);
754
 
    }
755
 
 
756
 
    // dt server(rank 0) gather contact info from other servers and write to
757
 
    // a file so upstream writers can connect to this application
758
 
    outputConnectionData(param_file, dt_read_data->dt_handle);
759
 
 
760
 
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
761
 
    dt_read_data->dt_server_ready = 1;
762
 
 
763
 
    // serve the network
764
 
    CMrun_network(dt_read_data->dt_cm);
765
 
 
766
 
    // TODO: cleanup and exit
767
 
    CManager_close(dt_read_data->dt_cm);
768
 
    return NULL;
769
 
}
770
 
 
771
 
int adios_read_datatap_init (MPI_Comm comm)
772
 
{
773
 
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
774
 
    setenv("CMSelfFormats", "1", 1);
775
 
 
776
 
    // initialize Datatap read method structure
777
 
    dt_read_data = (datatap_read_method_data *) malloc(sizeof(datatap_read_method_data));
778
 
    if(!dt_read_data) {
779
 
        adios_error(err_no_memory, "Cannot allocate memory for Datatap.");
780
 
        return -1;
781
 
    }    
782
 
    memset(dt_read_data, 0, sizeof(datatap_read_method_data));
783
 
 
784
 
    // enable threading for EVPath
785
 
    gen_pthread_init();
786
 
 
787
 
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
788
 
#if 0
789
 
    // set up queue for incoming data chunks
790
 
    dt_read_data->dt_queue =(Queue *) calloc(1, sizeof(Queue));
791
 
    if(!dt_read_data->dt_queue) {
792
 
        adios_error(err_no_memory, "Cannot allocate memory for Datatap.");
793
 
        return -1;
794
 
    }
795
 
    queue_init(dt_read_data->dt_queue, free);
796
 
    dt_read_data->dt_queue_max_length = DT_MAX_QUEUE_LENGTH;
797
 
 
798
 
    pthread_mutex_init(&(dt_read_data->mutex), NULL);
799
 
    pthread_cond_init(&(dt_read_data->cond1), NULL);
800
 
    pthread_cond_init(&(dt_read_data->cond2), NULL);
801
 
#endif    
802
 
 
803
 
    // fork the thread to poll network 
804
 
    int rc = pthread_create(&(dt_read_data->dt_server_thread), NULL, 
805
 
                            dt_server_thread_func, (void *)comm);
806
 
    if(rc) {
807
 
        adios_error(err_unspecified, "Failed to create Datatap server thread.");
808
 
        return -1; 
809
 
    }
810
 
 
811
 
    // TODO: wait until the dt server thread is ready
812
 
    while(!dt_read_data->dt_server_ready) { }
813
 
    
814
 
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
815
 
    return 0;
816
 
}
817
 
 
818
 
int adios_read_datatap_finalize ()
819
 
{
820
 
    // notify and wait for dt server thread to exit
821
 
    datatap_stop_server(dt_read_data->dt_handle);
822
 
    pthread_join(dt_read_data->dt_server_thread, NULL);
823
 
 
824
 
#if 0
825
 
    // TODO: we need a datatap cleanup function
826
 
    pthread_mutex_destroy(&(dt_read_data->mutex));
827
 
    pthread_cond_destroy(&(dt_read_data->cond1));
828
 
    pthread_cond_destroy(&(dt_read_data->cond2));    
829
 
    free(dt_read_data->dt_queue);
830
 
#endif
831
 
 
832
 
    free(dt_read_data);
833
 
    return 0;
834
 
}
835
 
 
836
 
ADIOS_FILE *adios_read_datatap_fopen(const char *fname, MPI_Comm comm)
837
 
{
838
 
    ADIOS_FILE *fp;
839
 
    adios_errno = 0;
840
 
 
841
 
    datatap_read_file_data *ds = (datatap_read_file_data *) malloc(sizeof(datatap_read_file_data));
842
 
    if(!ds) {
843
 
        adios_error(err_no_memory, "Cannot allocate memory for Datatap.");
844
 
        return NULL;                
845
 
    }
846
 
    ds->file_name = strdup(fname);    
847
 
     
848
 
    // here we need to syncrhonize with other reader processes to see what they find
849
 
    // 1: file available locally
850
 
    // 0: file not found
851
 
    int my_status, global_status;
852
 
    int total_num_readers, my_rank;
853
 
 
854
 
#ifdef _NOMPI
855
 
    total_num_readers = 1;
856
 
    my_rank = 0;
857
 
#else
858
 
    MPI_Comm_size(comm, &total_num_readers);
859
 
    MPI_Comm_rank(comm, &my_rank);
860
 
#endif
861
 
 
862
 
    ds->comm = comm;
863
 
    ds->my_rank = my_rank;
864
 
    ds->comm_size = total_num_readers;
865
 
 
866
 
    // first we need to make sure the file has been 'written'
867
 
    // TODO: always start from timstep 0
868
 
    int is_EOF = 0;
869
 
    ds->f_info = datatap_get_file_info(dt_read_data->dt_handle, fname, dt_read_data->num_io_dumps, &is_EOF);
870
 
    if(ds->f_info) {
871
 
        // now I should tell my peer readers that I have seen this file available locally
872
 
        my_status = 1;
873
 
        if(total_num_readers > 1) {
874
 
            int rc = MPI_Allreduce(&my_status, &global_status, 1, MPI_INT, MPI_SUM, comm);
875
 
            if(rc != MPI_SUCCESS) {
876
 
                fprintf(stderr, "something bad happened somewhere.\n");
877
 
                free(ds->file_name);
878
 
                free(ds);
879
 
                return NULL;
880
 
            } 
881
 
        }
882
 
        // now I can move on to process meta-data
883
 
fprintf(stderr, "im here rank %d move on %s:%d\n", my_rank, __FILE__,__LINE__);
884
 
    }
885
 
    else {
886
 
        if(is_EOF) {
887
 
            adios_errno = err_end_of_file;
888
 
            adios_error(err_end_of_file, "Reach the end of file (%s).", fname);
889
 
            free(ds->file_name);
890
 
            free(ds);
891
 
    
892
 
            if(total_num_readers > 1) {               
893
 
                // now I should wait for my peer readers to see EOF in their local context
894
 
                int rc = MPI_Barrier(comm);
895
 
                if(rc != MPI_SUCCESS) {
896
 
                    fprintf(stderr, "something bad happened somewhere.\n");
897
 
                }
898
 
                return NULL; 
899
 
            }
900
 
        }
901
 
        else {
902
 
            if(total_num_readers > 1) {
903
 
 
904
 
                // I didn't see the file available in my local context, but my peer readers may
905
 
                // have seen it, so I should ask them to figure out
906
 
fprintf(stderr, "im here rank %d ask %s:%d\n", my_rank, __FILE__,__LINE__);
907
 
                my_status = 0;
908
 
                MPI_Allreduce(&my_status, &global_status, 1, MPI_INT, MPI_SUM, comm);
909
 
                if(global_status == 0) { // no one see file available
910
 
fprintf(stderr, "im here rank %d no file %s:%d\n", my_rank, __FILE__,__LINE__);
911
 
                    adios_errno = err_file_not_found_error;
912
 
                    adios_error(err_file_not_found_error, "Cannot find file (%s).", fname);
913
 
fprintf(stderr, "im here rank %d no file %s:%d\n", my_rank, __FILE__,__LINE__);
914
 
                    free(ds->file_name);
915
 
                    free(ds);
916
 
                    return NULL;
917
 
                }
918
 
                else { 
919
 
                    // some one has seen this file available, so I should keep polling instead of return
920
 
                    do {
921
 
fprintf(stderr, "im here rank %d %s:%d\n", my_rank, __FILE__,__LINE__);
922
 
                        is_EOF = 0;
923
 
                        ds->f_info = datatap_get_file_info(dt_read_data->dt_handle, 
924
 
                            fname, dt_read_data->num_io_dumps, &is_EOF);
925
 
                        if(ds->f_info) { 
926
 
                            // now we get it!
927
 
fprintf(stderr, "im here rank %d %s:%d\n", my_rank, __FILE__,__LINE__);
928
 
                            break;
929
 
                        }
930
 
                        else {
931
 
                            if(is_EOF) {
932
 
                                adios_errno = err_end_of_file;
933
 
                                adios_error(err_end_of_file, "Reach the end of file (%s).", fname);
934
 
                                free(ds->file_name);
935
 
                                free(ds);
936
 
 
937
 
                                int rc = MPI_Barrier(comm);
938
 
                                if(rc != MPI_SUCCESS) {
939
 
                                    fprintf(stderr, "something bad happened somewhere.\n");
940
 
                                }
941
 
                                return NULL;
942
 
                            }
943
 
                            else {
944
 
                                usleep(10000); // TODO: we need to set a proper value
945
 
fprintf(stderr, "im here rank %d %s:%d\n", my_rank, __FILE__,__LINE__);
946
 
                                continue;
947
 
                            }
948
 
                        }  
949
 
                    } 
950
 
                    while (1);
951
 
                }
952
 
            }
953
 
            else {
954
 
                adios_errno = err_file_not_found_error;
955
 
                adios_error(err_file_not_found_error, "Cannot find file (%s).", fname);
956
 
fprintf(stderr, "im here rank %d no file %s:%d\n", my_rank, __FILE__,__LINE__);
957
 
                free(ds->file_name);
958
 
                free(ds);
959
 
                return NULL;
960
 
            }  
961
 
        }
962
 
    }       
963
 
fprintf(stderr, "im here rank %d see file %s:%d\n", my_rank, __FILE__,__LINE__);
964
 
    
965
 
    // we don't know what to read yet
966
 
    ds->num_vars = 0;
967
 
    ds->vars = NULL;
968
 
//    ds->num_vars_read = 0;
969
 
//    ds->vars_read = NULL;
970
 
//    ds->vars_read_tail = NULL;
971
 
 
972
 
    // TODO: add a loop over chunks
973
 
    int i;
974
 
    for(i = 0; i < ds->f_info->num_chunks; i ++) {  
975
 
        chunk_info *current_chunk = &(ds->f_info->chunks[i]);  
976
 
        
977
 
fprintf(stderr, "im here %s:%d\n", __FILE__,__LINE__);
978
 
        // parse the var info
979
 
        char *current_pos = current_chunk->var_info_buffer;
980
 
        int total_var_info_size = *(int *)current_pos; // total size
981
 
        current_pos += 4;
982
 
 
983
 
        if(i == 0) {
984
 
            ds->host_language_fortran = *(enum ADIOS_FLAG *)current_pos; // host language
985
 
        }
986
 
        current_pos += 4;
987
 
 
988
 
        int group_name_len = *(int *)current_pos; // size of group name
989
 
        current_pos += 4; 
990
 
  
991
 
        if(i == 0) { // TODO: let's assume one group name
992
 
            ds->group_name = strdup(current_pos); // group name
993
 
        }
994
 
        current_pos += group_name_len;
995
 
 
996
 
        int num_vars_in_pg = *(int *)current_pos; // total num of vars  
997
 
        current_pos += 4;
998
 
fprintf(stderr, "im here info size %d num vars %d %s:%d\n", total_var_info_size,num_vars_in_pg,__FILE__,__LINE__);
999
 
 
1000
 
        datatap_var_info *current_var;
1001
 
        char *end = current_chunk->var_info_buffer + total_var_info_size;
1002
 
        while(current_pos < end) {
1003
 
            // size of this var info
1004
 
            int var_info_size = *(int *) current_pos;             
1005
 
            current_pos += sizeof(int);        
1006
 
            int var_id = *(int *) current_pos;
1007
 
            current_pos += sizeof(int);        
1008
 
            int varname_len = *(int *) current_pos;        
1009
 
            current_pos += sizeof(int);        
1010
 
            char *varname = current_pos;  
1011
 
            current_pos += varname_len;
1012
 
            int varpath_len = *(int *) current_pos;
1013
 
            current_pos += sizeof(int);
1014
 
            char *varpath = current_pos;
1015
 
            current_pos += varpath_len;
1016
 
 
1017
 
            // now we go through the current list of vars to see if this is new
1018
 
            current_var = ds->vars;
1019
 
            while(current_var != NULL) {
1020
 
                // TODO: compare var id
1021
 
                //if(!strcmp(current_var->varname, varname) && 
1022
 
                //    !strcmp(current_var->varpath, varpath)) {
1023
 
                if(var_id == current_var->id) {
1024
 
                    break;
1025
 
                }
1026
 
                else {
1027
 
                    current_var = current_var->next;
1028
 
                }
1029
 
            }
1030
 
 
1031
 
            if(!current_var) { // this is a new var
1032
 
                current_var = (datatap_var_info *) malloc(sizeof(datatap_var_info));
1033
 
                if(!current_var) {
1034
 
                    adios_error(err_no_memory, "Cannot allocate memory for Datatap.");
1035
 
                    return NULL;
1036
 
                }
1037
 
                current_var->id = var_id;
1038
 
                current_var->varname = strdup(varname);
1039
 
                current_var->varpath = strdup(varpath);
1040
 
                current_var->type = *(enum ADIOS_DATATYPES *) current_pos;
1041
 
                current_pos += sizeof(enum ADIOS_DATATYPES);
1042
 
                current_var->time_dim = *(int *) current_pos;
1043
 
                current_pos += sizeof(int);
1044
 
                current_var->ndims = *(int *) current_pos;
1045
 
                current_pos += sizeof(int);
1046
 
                current_var->num_chunks = 0;
1047
 
                current_var->chunks = NULL;
1048
 
                if(!current_var->ndims) { // scalars and strings
1049
 
                    current_var->data_size = common_read_type_size(current_var->type, current_pos);
1050
 
                }
1051
 
fprintf(stderr, "im here %s %s %d %s:%d\n", current_var->varname, current_var->varpath, current_var->ndims, __FILE__,__LINE__);
1052
 
                current_var->next = ds->vars;
1053
 
                ds->vars = current_var;
1054
 
                ds->num_vars ++;
1055
 
            }
1056
 
            else {
1057
 
                current_pos += sizeof(enum ADIOS_DATATYPES);
1058
 
                //current_var->time_dim = *(int *) current_pos;
1059
 
                current_pos += sizeof(int);
1060
 
                //current_var->ndims = *(int *) current_pos;
1061
 
                current_pos += sizeof(int);
1062
 
            }            
1063
 
            
1064
 
            datatap_var_chunk *new_chunk = (datatap_var_chunk *) malloc(sizeof(datatap_var_chunk));             
1065
 
            if(!new_chunk) {
1066
 
                    adios_error(err_no_memory, "Cannot allocate memory for Datatap.");
1067
 
                    return NULL;
1068
 
            }
1069
 
            new_chunk->next = current_var->chunks;
1070
 
            current_var->chunks = new_chunk;
1071
 
            current_var->num_chunks ++; 
1072
 
 
1073
 
            new_chunk->rank = current_chunk->rank;
1074
 
            if(!current_var->ndims) { // scalars and strings
1075
 
                // copy data value             
1076
 
                new_chunk->data = malloc(current_var->data_size);
1077
 
                if(!new_chunk->data) {
1078
 
                    adios_error(err_no_memory, "Cannot allocate memory for Datatap.");
1079
 
                    return NULL;                                  
1080
 
                }
1081
 
                memcpy(new_chunk->data, current_pos, current_var->data_size);         
1082
 
                current_pos += current_var->data_size;
1083
 
            }
1084
 
            else { // arrays
1085
 
                new_chunk->local_bounds = (uint64_t *) malloc(current_var->ndims * sizeof(uint64_t));  
1086
 
                if(!new_chunk->local_bounds) {
1087
 
                    adios_error(err_no_memory, "Cannot allocate memory for Datatap.");
1088
 
                    return NULL;                                  
1089
 
                }
1090
 
                new_chunk->global_bounds = (uint64_t *) malloc(current_var->ndims * sizeof(uint64_t));  
1091
 
                if(!new_chunk->global_bounds) {
1092
 
                    adios_error(err_no_memory, "Cannot allocate memory for Datatap.");
1093
 
                    return NULL;                                  
1094
 
                }
1095
 
                new_chunk->global_offsets = (uint64_t *) malloc(current_var->ndims * sizeof(uint64_t));  
1096
 
                if(!new_chunk->global_offsets) {
1097
 
                    adios_error(err_no_memory, "Cannot allocate memory for Datatap.");
1098
 
                    return NULL;                                  
1099
 
                }
1100
 
                int i;
1101
 
                for(i = 0; i < current_var->ndims; i ++) {
1102
 
                    new_chunk->local_bounds[i] = *(uint64_t *)current_pos;
1103
 
                    current_pos += sizeof(uint64_t);  
1104
 
                    new_chunk->global_bounds[i] = *(uint64_t *)current_pos;
1105
 
                    current_pos += sizeof(uint64_t);  
1106
 
                    new_chunk->global_offsets[i] = *(uint64_t *)current_pos;
1107
 
                    current_pos += sizeof(uint64_t);  
1108
 
                }                          
1109
 
            }
1110
 
        }    
1111
 
        // TODO: at this point, we no longer need var_info_buffer so free it
1112
 
        // free(current_chunk->var_info_buffer);
1113
 
    }
1114
 
        
1115
 
 
1116
 
    // TODO: replicate meta-data among peer reader
1117
 
    ds->num_vars_peer = 0;
1118
 
    ds->vars_peer = NULL;
1119
 
 
1120
 
    if(total_num_readers > 1)
1121
 
    {
1122
 
        char *send_buf = NULL;
1123
 
        char *recv_buf = NULL; 
1124
 
        int send_count = VAR_INFO_SIZE, recv_count = VAR_INFO_SIZE;
1125
 
        if(my_rank == 0) {
1126
 
            for(i = 0; i < ds->f_info->num_chunks; i ++) {
1127
 
                chunk_info *current_chunk = &(ds->f_info->chunks[i]);
1128
 
fprintf(stderr, "im here rank %d %d %s:%d\n", ds->my_rank, current_chunk->rank,__FILE__,__LINE__);
1129
 
                if(current_chunk->rank == 0) {
1130
 
                    send_buf = ds->f_info->chunks[i].var_info_buffer;
1131
 
                    break;
1132
 
                } 
1133
 
            }
1134
 
            
1135
 
            // TODO: before we do this, we need to make sure this one has sth special
1136
 
            // the only case we need to deal with is local array with only one chunk
1137
 
            // we will check this when calling read_var
1138
 
 
1139
 
fprintf(stderr, "im here rank %d %s:%d\n", ds->my_rank, __FILE__,__LINE__);
1140
 
            int rc = MPI_Bcast(send_buf, send_count, MPI_BYTE, 0, comm);
1141
 
            if(rc != MPI_SUCCESS) {
1142
 
                fprintf(stderr, "rank %d: MPI_Scatter returns error (%d). %s:%d\n",
1143
 
                    my_rank, rc, __FILE__, __LINE__);
1144
 
                return NULL;
1145
 
            }
1146
 
        }
1147
 
        else {
1148
 
            recv_buf = (char *) malloc(VAR_INFO_SIZE); 
1149
 
            if(!recv_buf) {
1150
 
                adios_error(err_no_memory, "Cannot allocate memory for Datatap.");
1151
 
                return NULL;
1152
 
            }
1153
 
 
1154
 
fprintf(stderr, "im here rank %d %s:%d\n", ds->my_rank, __FILE__,__LINE__);
1155
 
            int rc = MPI_Bcast(recv_buf, recv_count, MPI_BYTE, 0, comm);
1156
 
            if(rc != MPI_SUCCESS) {
1157
 
                fprintf(stderr, "rank %d: MPI_Scatter returns error (%d). %s:%d\n",
1158
 
                    my_rank, rc, __FILE__, __LINE__);
1159
 
                return NULL;
1160
 
            }
1161
 
        }
1162
 
 
1163
 
        if(my_rank != 0) {
1164
 
            // parse the var_info buffer
1165
 
            char *current_pos = recv_buf;
1166
 
            int total_var_info_size = *(int *)current_pos; // total size
1167
 
            current_pos += 4;
1168
 
 
1169
 
            current_pos += 4; // host language
1170
 
 
1171
 
            int group_name_len = *(int *)current_pos; // size of group name
1172
 
            current_pos += 4;
1173
 
            current_pos += group_name_len;
1174
 
 
1175
 
            int num_vars = *(int *)current_pos; // total num of vars
1176
 
            current_pos += 4;
1177
 
 
1178
 
            datatap_var_info *current_var;
1179
 
            char *end = recv_buf + total_var_info_size;
1180
 
            while(current_pos < end) {
1181
 
                // size of this var info
1182
 
                int var_info_size = *(int *) current_pos;
1183
 
                char *start_of_next_var = current_pos + var_info_size;
1184
 
                current_pos += sizeof(int);
1185
 
                int var_id = *(int *) current_pos;
1186
 
                current_pos += sizeof(int);
1187
 
                int varname_len = *(int *) current_pos;
1188
 
                current_pos += sizeof(int);
1189
 
                char *varname = current_pos;
1190
 
                current_pos += varname_len;
1191
 
                int varpath_len = *(int *) current_pos;
1192
 
                current_pos += sizeof(int);
1193
 
                char *varpath = current_pos;
1194
 
                current_pos += varpath_len;
1195
 
 
1196
 
                // now we go through the current list of vars to see if this is new
1197
 
                current_var = ds->vars;
1198
 
                while(current_var != NULL) {
1199
 
                    //if(!strcmp(current_var->varname, varname) &&
1200
 
                    //    !strcmp(current_var->varpath, varpath)) {
1201
 
                    if(var_id == current_var->id) {
1202
 
                        // this var is locally available, so skip it
1203
 
                        break;
1204
 
                    }
1205
 
                    else {
1206
 
                        current_var = current_var->next;
1207
 
                    }
1208
 
                }
1209
 
 
1210
 
                if(current_var) { // this var is locally available
1211
 
                    current_pos = start_of_next_var;
1212
 
                    continue;
1213
 
                }
1214
 
 
1215
 
                current_var = (datatap_var_info *) malloc(sizeof(datatap_var_info));
1216
 
                if(!current_var) {
1217
 
                    adios_error(err_no_memory, "Cannot allocate memory for Datatap.");
1218
 
                    return NULL;
1219
 
                }
1220
 
                current_var->id = var_id;
1221
 
                current_var->varname = strdup(varname);
1222
 
                current_var->varpath = strdup(varpath);
1223
 
                current_var->type = *(enum ADIOS_DATATYPES *) current_pos;
1224
 
                current_pos += sizeof(enum ADIOS_DATATYPES);
1225
 
                current_var->time_dim = *(int *) current_pos;
1226
 
                current_pos += sizeof(int);
1227
 
                current_var->ndims = *(int *) current_pos;
1228
 
                current_pos += sizeof(int);
1229
 
                current_var->num_chunks = 0;
1230
 
                current_var->chunks = NULL;
1231
 
                if(!current_var->ndims) { // scalars and strings
1232
 
                    current_var->data_size = common_read_type_size(current_var->type, current_pos);
1233
 
                }
1234
 
                current_var->next = ds->vars_peer;
1235
 
                ds->vars_peer = current_var;
1236
 
                ds->num_vars_peer ++;
1237
 
 
1238
 
                datatap_var_chunk *new_chunk = (datatap_var_chunk *) malloc(sizeof(datatap_var_chunk));
1239
 
                if(!new_chunk) {
1240
 
                    adios_error(err_no_memory, "Cannot allocate memory for Datatap.");
1241
 
                    return NULL;
1242
 
                }
1243
 
                new_chunk->next = current_var->chunks;
1244
 
                current_var->chunks = new_chunk;
1245
 
                current_var->num_chunks ++;
1246
 
 
1247
 
                new_chunk->rank = 0;
1248
 
                if(!current_var->ndims) { // scalars and strings
1249
 
                    // copy data value
1250
 
                    new_chunk->data = malloc(current_var->data_size);
1251
 
                    if(!new_chunk->data) {
1252
 
                        adios_error(err_no_memory, "Cannot allocate memory for Datatap.");
1253
 
                        return NULL;
1254
 
                    }
1255
 
                    memcpy(new_chunk->data, current_pos, current_var->data_size);
1256
 
                    current_pos += current_var->data_size;
1257
 
                }
1258
 
                else { // arrays
1259
 
                    new_chunk->local_bounds = (uint64_t *) malloc(current_var->ndims * sizeof(uint64_t));
1260
 
                    if(!new_chunk->local_bounds) {
1261
 
                        adios_error(err_no_memory, "Cannot allocate memory for Datatap.");
1262
 
                        return NULL;
1263
 
                    }
1264
 
                    new_chunk->global_bounds = (uint64_t *) malloc(current_var->ndims * sizeof(uint64_t));
1265
 
                    if(!new_chunk->global_bounds) {
1266
 
                        adios_error(err_no_memory, "Cannot allocate memory for Datatap.");
1267
 
                        return NULL;
1268
 
                    }
1269
 
                    new_chunk->global_offsets = (uint64_t *) malloc(current_var->ndims * sizeof(uint64_t));
1270
 
                    if(!new_chunk->global_offsets) {
1271
 
                        adios_error(err_no_memory, "Cannot allocate memory for Datatap.");
1272
 
                        return NULL;
1273
 
                    }
1274
 
                    int i;
1275
 
                    for(i = 0; i < current_var->ndims; i ++) {
1276
 
                        new_chunk->local_bounds[i] = *(uint64_t *)current_pos;
1277
 
                        current_pos += sizeof(uint64_t);
1278
 
                        new_chunk->global_bounds[i] = *(uint64_t *)current_pos;
1279
 
                        current_pos += sizeof(uint64_t);
1280
 
                        new_chunk->global_offsets[i] = *(uint64_t *)current_pos;
1281
 
                        current_pos += sizeof(uint64_t);
1282
 
                    }
1283
 
                }
1284
 
            }
1285
 
            
1286
 
            free(recv_buf);   
1287
 
        }
1288
 
    }
1289
 
 
1290
 
fprintf(stderr, "im here %d %d %d %s:%d\n", ds->host_language_fortran, adios_flag_yes, futils_is_called_from_fortran(), __FILE__, __LINE__);
1291
 
    // TODO: here we need to ajust array dimension if the reader is in C 
1292
 
    // but writer is in Fortran or vice versa
1293
 
    if(ds->host_language_fortran == adios_flag_yes && !futils_is_called_from_fortran()) {
1294
 
        // reader is in C but writer is in Fortran, there are several things to adjust:
1295
 
        // array dimension index starts from 1 in Fortran --> start from 0 in C
1296
 
        // change array dimension order (including time dimension)
1297
 
        datatap_var_info *v = ds->vars;        
1298
 
        while(v) {
1299
 
            datatap_var_chunk *chunk = v->chunks; 
1300
 
            while(chunk) {
1301
 
                int i;
1302
 
                uint64_t temp;
1303
 
                for(i = 0; i < v->ndims/2; i ++) {
1304
 
                    temp = chunk->local_bounds[v->ndims-i-1];
1305
 
                    chunk->local_bounds[v->ndims-i-1] = chunk->local_bounds[i];
1306
 
                    chunk->local_bounds[i] = temp;
1307
 
                    temp = chunk->global_bounds[v->ndims-i-1];
1308
 
                    chunk->global_bounds[v->ndims-i-1] = chunk->global_bounds[i];
1309
 
                    chunk->global_bounds[i] = temp;
1310
 
                    temp = chunk->global_offsets[v->ndims-i-1];
1311
 
                    chunk->global_offsets[v->ndims-i-1] = chunk->global_offsets[i];
1312
 
                    chunk->global_offsets[i] = temp;
1313
 
                }
1314
 
                chunk = chunk->next;
1315
 
            }
1316
 
            if(v->time_dim > 0) { // -1 means no time dimension
1317
 
                v->time_dim = v->ndims - v->time_dim;  
1318
 
            }
1319
 
            v = v->next;
1320
 
        }
1321
 
 
1322
 
        v = ds->vars_peer;
1323
 
        while(v) {
1324
 
            datatap_var_chunk *chunk = v->chunks;
1325
 
            while(chunk) {
1326
 
                int i;
1327
 
                for(i = 0; i < v->ndims; i ++) {
1328
 
                    chunk->local_bounds[i] --;
1329
 
                    //chunk->global_offsets[i] --;
1330
 
                }
1331
 
                uint64_t temp;
1332
 
                for(i = 0; i < v->ndims/2; i ++) {
1333
 
                    temp = chunk->local_bounds[v->ndims-i-1];
1334
 
                    chunk->local_bounds[v->ndims-i-1] = chunk->local_bounds[i];
1335
 
                    chunk->local_bounds[i] = temp;
1336
 
                    temp = chunk->global_bounds[v->ndims-i-1];
1337
 
                    chunk->global_bounds[v->ndims-i-1] = chunk->global_bounds[i];
1338
 
                    chunk->global_bounds[i] = temp;
1339
 
                    temp = chunk->global_offsets[v->ndims-i-1];
1340
 
                    chunk->global_offsets[v->ndims-i-1] = chunk->global_offsets[i];
1341
 
                    chunk->global_offsets[i] = temp;
1342
 
                }
1343
 
                chunk = chunk->next;
1344
 
            }
1345
 
            v->time_dim = v->ndims - v->time_dim - 1;
1346
 
            v = v->next;
1347
 
        }
1348
 
    }
1349
 
    else if(ds->host_language_fortran == adios_flag_no && futils_is_called_from_fortran()) {
1350
 
        // adjuct dimension C --> Fortran  
1351
 
        // TODO: for the demo, this will not happen so leave it here
1352
 
    }
1353
 
 
1354
 
    fp = (ADIOS_FILE *) malloc(sizeof(ADIOS_FILE));
1355
 
    if(!fp) {
1356
 
        adios_error(err_no_memory, "Cannot allocate memory for file info.");
1357
 
        free(ds);
1358
 
        return NULL;
1359
 
    }
1360
 
 
1361
 
    fp->fh = (uint64_t) ds;
1362
 
    fp->groups_count = 1; // TODO: assume one group per file
1363
 
    fp->vars_count = 0; // TODO: just do not use this filed
1364
 
    fp->attrs_count = 0; // TODO: do not support attributes yet
1365
 
 
1366
 
    // TODO: since we require fopen/fclose for each timestep, 
1367
 
    // so there is always only 1 timestep in file
1368
 
    // TODO: set ntimesteps to be max to make pixie3d working
1369
 
    fp->tidx_start = ds->f_info->timestep;  
1370
 
    fp->ntimesteps = INT32_MAX;
1371
 
fprintf(stderr, "im here timestep %d %s:%d\n", fp->tidx_start = ds->f_info->timestep, __FILE__,__LINE__);
1372
 
 
1373
 
    fp->file_size = 0; 
1374
 
    fp->version = 1;
1375
 
    fp->endianness = 0; 
1376
 
    alloc_namelist(&fp->group_namelist, fp->groups_count); 
1377
 
    for (i = 0; i < fp->groups_count; i++) {
1378
 
        if (!fp->group_namelist[i])  {
1379
 
            adios_error(err_no_memory, "Cannot allocate buffer in adios_fopen()");
1380
 
            return NULL;
1381
 
        }
1382
 
        else {
1383
 
            strcpy(fp->group_namelist[i], ds->group_name); 
1384
 
        }
1385
 
    }
1386
 
fprintf(stderr, "im here %s:%d\n", __FILE__,__LINE__);
1387
 
 
1388
 
    // TODO: return code of adios_errno for fopen:
1389
 
    // 0: file metadata is available and everything is success
1390
 
    // we need a value telling that the file is not available yet
1391
 
    // we also need a value telling that writer is finishing so don't
1392
 
    // read this file any more
1393
 
 
1394
 
    return fp;  
1395
 
}
1396
 
 
1397
 
int adios_read_datatap_fclose(ADIOS_FILE *fp)
1398
 
{
1399
 
fprintf(stderr, "im here rank %d %s:%d\n",dt_read_data->dt_comm_rank,__FILE__,__LINE__);
1400
 
    datatap_read_file_data *ds = (datatap_read_file_data *) fp->fh;
1401
 
 
1402
 
    adios_errno = 0;
1403
 
 
1404
 
    dt_read_data->num_io_dumps ++;
1405
 
 
1406
 
    // recycle the queue
1407
 
    QueueItem *qi;
1408
 
 
1409
 
    while((!queue_dequeue(ds->f_info->dt_queue, &qi)) && qi) {
1410
 
        free(qi->data);
1411
 
        free(qi);  
1412
 
    }
1413
 
 
1414
 
    // now we 'read' the data into user-provided buffers
1415
 
    // note that the data is actually moved by dt servr thread
1416
 
    // here we just re-organize the data into the distribution
1417
 
    // user wants
1418
 
 
1419
 
#if 0
1420
 
    // process the incoming data chunks 
1421
 
    void *d = NULL;
1422
 
    QueueItem *qi;
1423
 
    uint64_t num_chunks_processed = 0;
1424
 
    int file_done = 0;
1425
 
 
1426
 
    while(1) {
1427
 
        pthread_mutex_lock(&(ds->f_info->mutex));
1428
 
                while(queue_size(ds->f_info->dt_queue) == 0) {
1429
 
        fprintf(stderr, "im here num_chunks_processed %d %d %s:%d\n",num_chunks_processed, ds->num_pgs,__FILE__,__LINE__);
1430
 
            // check if it's time to finish this file
1431
 
            if(num_chunks_processed == ds->num_pgs) { // TODO
1432
 
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
1433
 
                dt_read_data->num_io_dumps ++;
1434
 
                pthread_mutex_unlock(&(ds->f_info->mutex));
1435
 
                file_done = 1;
1436
 
                break;
1437
 
            }
1438
 
            else { 
1439
 
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
1440
 
                pthread_cond_signal(&(ds->f_info->cond1));
1441
 
                pthread_cond_wait(&(ds->f_info->cond2), &(ds->f_info->mutex));
1442
 
            }
1443
 
        }
1444
 
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
1445
 
 
1446
 
        if(file_done) {
1447
 
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
1448
 
            break;
1449
 
        }
1450
 
 
1451
 
        // start of busy time
1452
 
        while((!queue_dequeue(ds->f_info->dt_queue, &d)) && d) {
1453
 
            //pthread_cond_signal(&(ds->f_info->cond1));
1454
 
            pthread_mutex_unlock(&(ds->f_info->mutex));
1455
 
 
1456
 
            qi =(QueueItem *) d;
1457
 
 
1458
 
fprintf(stderr, "im here rank %d %s:%d\n",dt_read_data->dt_comm_rank,__FILE__,__LINE__);
1459
 
            int rc = reorganize_array (qi, ds);
1460
 
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
1461
 
            if(rc) {
1462
 
                adios_error(err_unspecified, "Error in reorganize_array() function.");
1463
 
                exit(-1);
1464
 
            }
1465
 
 
1466
 
            // recycle decoded data buffer
1467
 
            free(qi->data);
1468
 
            free(qi);
1469
 
 
1470
 
            num_chunks_processed ++;
1471
 
 
1472
 
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
1473
 
            pthread_mutex_unlock(&(ds->f_info->mutex));
1474
 
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
1475
 
            pthread_mutex_lock(&(ds->f_info->mutex));
1476
 
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
1477
 
        }
1478
 
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
1479
 
 
1480
 
        pthread_mutex_unlock(&(ds->f_info->mutex));
1481
 
    }
1482
 
#endif    
1483
 
 
1484
 
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
1485
 
    // notify dt server thread that we are done with this file 
1486
 
    int rc = datatap_release_file(dt_read_data->dt_handle, ds->f_info);
1487
 
    if(rc != 0) {
1488
 
        adios_error(err_unspecified, "Could not close file.");
1489
 
        return -1;
1490
 
    }
1491
 
 
1492
 
    free_namelist((fp->group_namelist), fp->groups_count);
1493
 
    if (ds->file_name) { 
1494
 
        free(ds->file_name); 
1495
 
        ds->file_name = NULL; 
1496
 
    }
1497
 
    
1498
 
    if (ds->group_name) {
1499
 
        free(ds->group_name);
1500
 
        ds->group_name = NULL;
1501
 
    }
1502
 
 
1503
 
    // TODO release pg_info and var_info 
1504
 
    datatap_var_info *v = ds->vars;
1505
 
    datatap_var_info *tmp_v;
1506
 
    while(v) {
1507
 
        tmp_v = v;
1508
 
        free(v->varname);
1509
 
        free(v->varpath);
1510
 
        datatap_var_chunk *chunk = v->chunks;
1511
 
        datatap_var_chunk *tmp_chunk;
1512
 
        while(chunk) {
1513
 
            tmp_chunk = chunk;
1514
 
            if(v->ndims) {
1515
 
                free(chunk->local_bounds);
1516
 
                free(chunk->global_bounds);
1517
 
                free(chunk->global_offsets);
1518
 
            }
1519
 
            else {
1520
 
                free(chunk->data);     
1521
 
            }
1522
 
            chunk = chunk->next;
1523
 
            free(tmp_chunk);
1524
 
        }
1525
 
        v = v->next;
1526
 
        free(tmp_v);
1527
 
    }
1528
 
 
1529
 
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
1530
 
    v = ds->vars_peer;
1531
 
    while(v) {
1532
 
        tmp_v = v;
1533
 
        free(v->varname);
1534
 
        free(v->varpath);
1535
 
        datatap_var_chunk *chunk = v->chunks;
1536
 
        datatap_var_chunk *tmp_chunk;
1537
 
        while(chunk) {
1538
 
            tmp_chunk = chunk;
1539
 
            if(v->ndims) {
1540
 
                free(chunk->local_bounds);
1541
 
                free(chunk->global_bounds);
1542
 
                free(chunk->global_offsets);
1543
 
            }
1544
 
            else {
1545
 
                free(chunk->data);
1546
 
            }
1547
 
            chunk = chunk->next;
1548
 
            free(tmp_chunk);
1549
 
        }
1550
 
        v = v->next;
1551
 
        free(tmp_v);
1552
 
    }
1553
 
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
1554
 
 
1555
 
    free(ds);
1556
 
    free(fp);
1557
 
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
1558
 
    return 0;
1559
 
}
1560
 
 
1561
 
ADIOS_GROUP * adios_read_datatap_gopen (ADIOS_FILE *fp, const char *grpname)
1562
 
{
1563
 
fprintf(stderr, "im here %s:%d\n", __FILE__,__LINE__);
1564
 
    if(!grpname) {
1565
 
        adios_error(err_invalid_group, "Group name is not valid");
1566
 
        return NULL;
1567
 
    }
1568
 
    int i;
1569
 
    for(i = 0; i < fp->groups_count; i ++) {   
1570
 
        if(!strcmp(grpname, fp->group_namelist[i])) {
1571
 
            return adios_read_datatap_gopen_byid(fp, i);
1572
 
        }
1573
 
    }
1574
 
    adios_error(err_invalid_group, "Group %s is not valid", grpname);
1575
 
    return NULL;
1576
 
}
1577
 
 
1578
 
ADIOS_GROUP * adios_read_datatap_gopen_byid (ADIOS_FILE *fp, int grpid)
1579
 
{
1580
 
fprintf(stderr, "im here %s:%d\n", __FILE__,__LINE__);
1581
 
    datatap_read_file_data *ds = (datatap_read_file_data *) fp->fh;
1582
 
    ADIOS_GROUP * gp;
1583
 
 
1584
 
    adios_errno = 0;
1585
 
 
1586
 
    gp = (ADIOS_GROUP *) malloc(sizeof(ADIOS_GROUP));
1587
 
    if (!gp) {
1588
 
        adios_error(err_no_memory, "Could not allocate memory for group info");
1589
 
        return NULL;
1590
 
    }
1591
 
 
1592
 
    // TODO: again, assume one group per file
1593
 
    gp->grpid = grpid;
1594
 
    gp->gh = (uint64_t) 0; // TODO: we should re-organize the metadata
1595
 
    gp->fp = fp;
1596
 
    gp->attrs_count = 0; // attributes are not supported yet
1597
 
    
1598
 
fprintf(stderr, "im here rank %d %s:%d\n", ds->my_rank, __FILE__,__LINE__);
1599
 
    // generate a list of variables with distinct names among all pgs
1600
 
    gp->vars_count = ds->num_vars + ds->num_vars_peer;
1601
 
fprintf(stderr, "im here count %d %s:%d\n", gp->vars_count,__FILE__,__LINE__);
1602
 
    
1603
 
    // to return a globally consistently ordered var list, we sort the list by var id
1604
 
    datatap_var_info **vars = (datatap_var_info *) malloc(sizeof(datatap_var_info *) * gp->vars_count);
1605
 
    if(!vars) {
1606
 
        adios_error(err_no_memory, "Cannot allocate buffer in adios_read_datatap_gopen_byid()");
1607
 
        return NULL;
1608
 
    }
1609
 
    memset(vars, 0, sizeof(datatap_var_info *) * gp->vars_count);
1610
 
 
1611
 
fprintf(stderr, "im here rank %d %s:%d\n", ds->my_rank, __FILE__,__LINE__);
1612
 
    datatap_var_info *current_var = ds->vars_peer;
1613
 
    datatap_var_info *var_to_sort = current_var;
1614
 
    while(var_to_sort) {
1615
 
        int j;
1616
 
        for(j = 0; j < gp->vars_count; j ++) {
1617
 
            if(vars[j] == NULL) {
1618
 
                vars[j] = var_to_sort;
1619
 
                current_var = current_var->next;
1620
 
                var_to_sort = current_var;
1621
 
                break; 
1622
 
            }
1623
 
            else if(vars[j]->id < var_to_sort->id) {
1624
 
                // move vars[j] 
1625
 
                datatap_var_info *tmp; 
1626
 
                tmp = vars[j];
1627
 
                vars[j] = var_to_sort;
1628
 
                var_to_sort = tmp;
1629
 
                break;
1630
 
            }
1631
 
        }
1632
 
    }
1633
 
fprintf(stderr, "im here rank %d %s:%d\n", ds->my_rank, __FILE__,__LINE__);
1634
 
 
1635
 
    current_var = ds->vars;
1636
 
    var_to_sort = current_var;
1637
 
    while(var_to_sort) {
1638
 
        int j;
1639
 
        for(j = 0; j < gp->vars_count; j ++) {
1640
 
            if(vars[j] == NULL) {
1641
 
                vars[j] = var_to_sort;
1642
 
                current_var = current_var->next;
1643
 
                var_to_sort = current_var;
1644
 
                break;
1645
 
            }
1646
 
            else if(vars[j]->id < var_to_sort->id) {
1647
 
                // move vars[j]
1648
 
                datatap_var_info *tmp;
1649
 
                tmp = vars[j];
1650
 
                vars[j] = var_to_sort;
1651
 
                var_to_sort = tmp;
1652
 
                break;
1653
 
            }
1654
 
        }
1655
 
    }
1656
 
    // now the vars list is in descendent order
1657
 
    gp->var_namelist = (char **) malloc(gp->vars_count * sizeof(char *));
1658
 
    if(!gp->var_namelist) {
1659
 
        adios_error(err_no_memory, "Cannot allocate buffer in adios_read_datatap_gopen_byid()");
1660
 
        return NULL;
1661
 
    }
1662
 
 
1663
 
fprintf(stderr, "im here rank %d %s:%d\n", ds->my_rank, __FILE__,__LINE__);
1664
 
    int i;
1665
 
    for(i = 0; i < gp->vars_count; i ++) {
1666
 
        int index = gp->vars_count - i - 1;
1667
 
        gp->var_namelist[i] = (char *) malloc(strlen(vars[index]->varname) +
1668
 
            strlen(vars[index]->varpath) + 2);
1669
 
        if(!gp->var_namelist[i]) {
1670
 
            adios_error(err_no_memory, "Cannot allocate buffer in adios_read_datatap_gopen_byid()");
1671
 
            return NULL;
1672
 
        }
1673
 
        // return full path name
1674
 
        // TODO: make sure the size of var_namelist[j] is enough
1675
 
        if(!strcmp(vars[index]->varpath, "/")) {
1676
 
            sprintf(gp->var_namelist[i], "/%s\0\0", vars[index]->varname);
1677
 
        }
1678
 
        else {
1679
 
            sprintf(gp->var_namelist[i], "%s/%s\0", vars[index]->varpath,
1680
 
                vars[index]->varname);
1681
 
        }
1682
 
    }
1683
 
fprintf(stderr, "im here rank %d %s:%d\n", ds->my_rank, __FILE__,__LINE__);
1684
 
 
1685
 
    // here we construct a bitmap of var list and send it to rank 0 so rank 0 knows 
1686
 
    // which vars are only avaialble in its local memory
1687
 
    memset(ds->var_bitmap, 0, VAR_BITMAP_SIZE);
1688
 
    current_var = ds->vars_peer;
1689
 
    while(current_var) {           
1690
 
        int byte_pos = current_var->id / 8;
1691
 
        int bit_pos = current_var->id % 8;
1692
 
        unsigned char mask = 0x01 << bit_pos;
1693
 
        ds->var_bitmap[byte_pos] = ds->var_bitmap[byte_pos] | mask;
1694
 
        current_var = current_var->next;
1695
 
    }
1696
 
 
1697
 
fprintf(stderr, "im here rank %d %s:%d\n", ds->my_rank, __FILE__,__LINE__);
1698
 
    // send back to rank 0 so it knows which var is missing on other processes
1699
 
    unsigned char *combined_bitmap = NULL;
1700
 
    int combined_size = ds->comm_size * VAR_BITMAP_SIZE;
1701
 
    if(ds->my_rank == 0) {
1702
 
        combined_bitmap = (unsigned char *) malloc(combined_size);
1703
 
        if(!combined_size) {
1704
 
            adios_error(err_no_memory, "Cannot allocate buffer in adios_read_datatap_gopen_byid()");
1705
 
            return NULL;
1706
 
        }
1707
 
    }
1708
 
    MPI_Gather(ds->var_bitmap, VAR_BITMAP_SIZE, MPI_BYTE, combined_bitmap, VAR_BITMAP_SIZE, MPI_BYTE, 0, ds->comm);
1709
 
 
1710
 
fprintf(stderr, "im here rank %d %s:%d\n", ds->my_rank, __FILE__,__LINE__);
1711
 
    if(ds->my_rank == 0) {
1712
 
        // now determine which var is missing on other processes 
1713
 
        for(i = 0; i < ds->comm_size; i ++) {
1714
 
            int j;
1715
 
            for(j = 0; j < VAR_BITMAP_SIZE; j ++) {
1716
 
                ds->var_bitmap[j] = ds->var_bitmap[j] | combined_bitmap[i*VAR_BITMAP_SIZE+j];
1717
 
            }
1718
 
        }
1719
 
        free(combined_bitmap);
1720
 
        // if the corresponding bit is set in ds->var_bitmap, then the var is missing on other processes
1721
 
    }
1722
 
 
1723
 
fprintf(stderr, "im here rank %d %s:%d\n", ds->my_rank, __FILE__,__LINE__);
1724
 
#if 0
1725
 
    do {
1726
 
        if(i == 0) { // the first pg
1727
 
            gp->vars_count = ds->pgs[0].num_vars;
1728
 
            gp->var_namelist = (char **) malloc(gp->vars_count * sizeof(char *));
1729
 
            if(!gp->var_namelist) {
1730
 
                adios_error(err_no_memory, "Cannot allocate buffer in adios_read_datatap_gopen_byid()");
1731
 
                return NULL;
1732
 
            }
1733
 
            for(j = 0; j < gp->vars_count; j ++) {
1734
 
                gp->var_namelist[j] = (char *) malloc(strlen(ds->pgs[i].vars[j].varname) + 
1735
 
                    strlen(ds->pgs[i].vars[j].varpath) + 2);
1736
 
                if(!gp->var_namelist[j]) {
1737
 
                    adios_error(err_no_memory, "Cannot allocate buffer in adios_read_datatap_gopen_byid()");
1738
 
                    return NULL;
1739
 
                }   
1740
 
                // return full path name   
1741
 
                // TODO: make sure the size of var_namelist[j] is enough
1742
 
                if(!strcmp(ds->pgs[i].vars[j].varpath, "/")) {
1743
 
                    sprintf(gp->var_namelist[j], "/%s\0\0", ds->pgs[i].vars[j].varname);
1744
 
                }
1745
 
                else {
1746
 
                    sprintf(gp->var_namelist[j], "%s/%s\0", ds->pgs[i].vars[j].varpath, 
1747
 
                        ds->pgs[i].vars[j].varname);
1748
 
                }    
1749
 
            }
1750
 
            k = gp->vars_count;
1751
 
            i ++;
1752
 
            continue;
1753
 
        } 
1754
 
 
1755
 
        // go over the vars in the ith pg 
1756
 
        for(j = 0; j < ds->pgs[i].num_vars; j ++) {
1757
 
            // first, we need to make sure the var is not seen before
1758
 
            int t;
1759
 
            int is_new = 1;
1760
 
            char fullname[256];
1761
 
            if(!strcmp(ds->pgs[i].vars[j].varpath, "/")) {
1762
 
                sprintf(fullname, "/%s\0", ds->pgs[i].vars[j].varname);
1763
 
            }
1764
 
            else { 
1765
 
                sprintf(fullname, "%s/%s\0", ds->pgs[i].vars[j].varpath, ds->pgs[i].vars[j].varname);  
1766
 
            } 
1767
 
            for(t = 0; t < k; t++) {
1768
 
                if(!strcmp(fullname, gp->var_namelist[t])) { 
1769
 
                    is_new = 0;
1770
 
                    break;  
1771
 
                } 
1772
 
            }
1773
 
 
1774
 
            if(!is_new) continue;
1775
 
 
1776
 
            // now add this var to list
1777
 
            char **temp = gp->var_namelist;  
1778
 
//            temp = (char **) realloc(gp->var_namelist, (k+1) * sizeof(char*));
1779
 
            temp = (char **) malloc((k+1) * sizeof(char*));
1780
 
            if(!temp) {
1781
 
                adios_error(err_no_memory, "Cannot allocate buffer in adios_read_datatap_gopen_byid()");
1782
 
                return NULL;
1783
 
            }
1784
 
            else {
1785
 
                memcpy(temp, gp->var_namelist, k*sizeof(char *));
1786
 
                free(gp->var_namelist);
1787
 
                gp->var_namelist = temp;
1788
 
 
1789
 
            }
1790
 
             
1791
 
            // return full path name
1792
 
            gp->var_namelist[k] = strdup(fullname);
1793
 
            gp->vars_count ++; 
1794
 
            k ++; 
1795
 
        }
1796
 
 
1797
 
        i ++;
1798
 
    }
1799
 
    while(i < ds->num_pgs);
1800
 
#endif 
1801
 
 
1802
 
fprintf(stderr, "im here rank %d %s:%d\n", ds->my_rank, __FILE__,__LINE__);
1803
 
 
1804
 
    gp->attr_namelist = 0;
1805
 
 
1806
 
    gp->timestep = ds->f_info->timestep;
1807
 
    gp->lasttimestep = ds->f_info->timestep;
1808
 
 
1809
 
    // now we need to wait here until we see all data are ready
1810
 
    pthread_mutex_lock(&(ds->f_info->mutex));
1811
 
    while(queue_size(ds->f_info->dt_queue) != ds->f_info->num_chunks) {
1812
 
        pthread_cond_wait(&(ds->f_info->cond2), &(ds->f_info->mutex));
1813
 
    }
1814
 
    pthread_mutex_unlock(&(ds->f_info->mutex));
1815
 
 
1816
 
fprintf(stderr, "im here rank %d %s:%d\n", ds->my_rank, __FILE__,__LINE__);
1817
 
    return gp;
1818
 
}
1819
 
 
1820
 
int adios_read_datatap_gclose (ADIOS_GROUP *gp)
1821
 
{
1822
 
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
1823
 
//    datatap_read_file_data *ds = (datatap_read_file_data *) gp->fp->fh;
1824
 
    adios_errno = 0;
1825
 
    free_namelist ((gp->attr_namelist),gp->attrs_count);
1826
 
    int i;
1827
 
    for(i = 0; i < gp->vars_count; i ++) {
1828
 
        free(gp->var_namelist[i]); 
1829
 
    }
1830
 
    free(gp->var_namelist);
1831
 
//    free_namelist ((gp->var_namelist),gp->vars_count);
1832
 
    free(gp);
1833
 
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
1834
 
    return 0;
1835
 
 
1836
 
}
1837
 
 
1838
 
int adios_read_datatap_get_attr (ADIOS_GROUP *gp, const char *attrname, 
1839
 
                                 enum ADIOS_DATATYPES *type,
1840
 
                                 int *size, void **data)
1841
 
{
1842
 
    // TODO: borrowed from dimes
1843
 
    adios_error(err_invalid_read_method, "adios_read_datatap_get_attr is not implemented.");
1844
 
    *size = 0;
1845
 
    *type = adios_unknown;
1846
 
    *data = 0;
1847
 
    return adios_errno;
1848
 
}
1849
 
 
1850
 
int adios_read_datatap_get_attr_byid (ADIOS_GROUP *gp, int attrid, 
1851
 
                                      enum ADIOS_DATATYPES *type, 
1852
 
                                      int *size, void **data)
1853
 
{
1854
 
    // TODO: borrowed from dimes
1855
 
    adios_error(err_invalid_read_method, "adios_read_datatap_get_attr_byid is not implemented.");
1856
 
    *size = 0;
1857
 
    *type = adios_unknown;
1858
 
    *data = 0;
1859
 
    return adios_errno;
1860
 
}
1861
 
 
1862
 
ADIOS_VARINFO * adios_read_datatap_inq_var (ADIOS_GROUP *gp, const char *varname) 
1863
 
{
1864
 
    // TODO: usually user will read those variables reperesenting dimensions directly
1865
 
//    error(err_invalid_read_method, "adios_read_datatap_inq_var is not implemented.");
1866
 
 
1867
 
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
1868
 
    // find the var among all pgs
1869
 
    ADIOS_VARINFO *v = (ADIOS_VARINFO *) malloc(sizeof(ADIOS_VARINFO));
1870
 
    if(!v) {
1871
 
        adios_error(err_no_memory, "Cannot allocate buffer in adios_read_datatap_inq_var()");
1872
 
        return NULL;
1873
 
    }
1874
 
    memset(v, 0, sizeof(ADIOS_VARINFO));
1875
 
 
1876
 
    datatap_read_file_data *ds = (datatap_read_file_data *) gp->fp->fh;
1877
 
    
1878
 
    int found = 0;
1879
 
    datatap_var_info *current_var = ds->vars;
1880
 
    while(current_var) {
1881
 
        if(!compare_var_name(varname, current_var)) { 
1882
 
            found = 1;
1883
 
            break;
1884
 
        }
1885
 
        else {
1886
 
            current_var = current_var->next;
1887
 
        } 
1888
 
    } 
1889
 
 
1890
 
    if(!found) {
1891
 
        current_var = ds->vars_peer;
1892
 
        while(current_var) {
1893
 
            if(!compare_var_name(varname, current_var)) {
1894
 
                found = 1;
1895
 
                break;
1896
 
            }
1897
 
            else {
1898
 
                current_var = current_var->next;
1899
 
            }
1900
 
        }
1901
 
    }
1902
 
  
1903
 
    if(found) {
1904
 
        v->grpid = gp->grpid;
1905
 
        int i;
1906
 
        for(i = 0; i < gp->vars_count; i ++) {
1907
 
            if(!strcmp(gp->var_namelist[i], varname)) {
1908
 
                v->varid = i; // TODO: this may not be cmpatible with BP
1909
 
                break;
1910
 
            }
1911
 
        }
1912
 
        v->type = current_var->type;
1913
 
        v->ndim = current_var->ndims;
1914
 
        v->timedim = current_var->time_dim;
1915
 
        if(!v->ndim) { // scalar and string
1916
 
            if(v->timedim != -1) { // scalar with time dimension
1917
 
                v->ndim = 1;
1918
 
                v->dims = (uint64_t *) malloc(sizeof(uint64_t));
1919
 
                if(!v->dims) {
1920
 
                    adios_error(err_no_memory, "Cannot allocate buffer in adios_read_datatap_inq_var()");
1921
 
                    return NULL;
1922
 
                }
1923
 
                v->dims[0] = 1; // TODO: only one timestep in the file
1924
 
            }
1925
 
            //int value_size = common_read_type_size(v->type, current_var->chunks->data);
1926
 
            int value_size = current_var->data_size;
1927
 
            v->value = malloc(value_size);
1928
 
            if(!v->value) {
1929
 
                adios_error(err_no_memory, "Cannot allocate buffer in adios_read_datatap_inq_var()");
1930
 
                return NULL;
1931
 
            }
1932
 
            memcpy(v->value, current_var->chunks->data, value_size);
1933
 
        }
1934
 
        else { // arrays
1935
 
            v->dims = (uint64_t *) malloc(v->ndim * sizeof(uint64_t));
1936
 
            if(!v->dims) {
1937
 
                adios_error(err_no_memory, "Cannot allocate buffer in adios_read_datatap_inq_var()");
1938
 
                return NULL;
1939
 
            }
1940
 
            int k;
1941
 
            for(k = 0; k < v->ndim; k ++) {
1942
 
                //v->dims[k] = ds->pgs[i].vars[j].global_bounds[k];
1943
 
                v->dims[k] = current_var->chunks->global_bounds[k];
1944
 
            }
1945
 
        }
1946
 
        return v;
1947
 
    }
1948
 
    else {
1949
 
        adios_error(err_invalid_varname, "Cannot find var %s\n", varname);
1950
 
        return NULL;
1951
 
    }
1952
 
 
1953
 
    
1954
 
#if 0
1955
 
    int i, j;
1956
 
    for(i = 0; i < ds->num_pgs; i ++) {
1957
 
        for(j = 0; j < ds->pgs[i].num_vars; j ++) {
1958
 
            // the parameter varname can be full path or just var name, so we
1959
 
            // need to first find it by matching the name
1960
 
            if(!compare_var_name(varname, &(ds->pgs[i].vars[j]))) {     
1961
 
                v->grpid = gp->grpid;
1962
 
                v->varid = j; // TODO: this may not be cmpatible with BP 
1963
 
                v->type = ds->pgs[i].vars[j].type;
1964
 
                v->ndim = ds->pgs[i].vars[j].ndims; 
1965
 
                v->timedim = ds->pgs[i].vars[j].time_dim;
1966
 
                if(!v->ndim) { // scalar
1967
 
                    if(v->timedim != -1) { // scalar with time dimension
1968
 
                        v->ndim = 1;
1969
 
                        v->dims = (uint64_t *) malloc(sizeof(uint64_t));
1970
 
                        if(!v->dims) {
1971
 
                            adios_error(err_no_memory, "Cannot allocate buffer in adios_read_datatap_inq_var()");
1972
 
                            return NULL;
1973
 
                        }
1974
 
                        v->dims[0] = 1; // TODO: only one timestep in the file
1975
 
                    }
1976
 
                    int value_size = common_read_type_size(v->type, ds->pgs[i].vars[j].data);
1977
 
                    v->value = malloc(value_size); 
1978
 
                    if(!v->value) {
1979
 
                        adios_error(err_no_memory, "Cannot allocate buffer in adios_read_datatap_inq_var()");
1980
 
                        return NULL;
1981
 
                    }
1982
 
                    memcpy(v->value, ds->pgs[i].vars[j].data, value_size);
1983
 
                }
1984
 
                else { // arrays  
1985
 
                    v->dims = (uint64_t *) malloc(v->ndim * sizeof(uint64_t));   
1986
 
                    if(!v->dims) {
1987
 
                        adios_error(err_no_memory, "Cannot allocate buffer in adios_read_datatap_inq_var()");
1988
 
                        return NULL;
1989
 
                    }
1990
 
                    int k;
1991
 
                    for(k = 0; k < v->ndim; k ++) {
1992
 
                        v->dims[k] = ds->pgs[i].vars[j].global_bounds[k]; 
1993
 
                    }
1994
 
                }
1995
 
                return v;
1996
 
            }
1997
 
        }
1998
 
    }
1999
 
 
2000
 
    return NULL;    
2001
 
#endif
2002
 
 
2003
 
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
2004
 
}
2005
 
 
2006
 
ADIOS_VARINFO * adios_read_datatap_inq_var_byid (ADIOS_GROUP *gp, int varid)
2007
 
{
2008
 
    if(varid >= 0 && varid < gp->vars_count) {
2009
 
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
2010
 
        return adios_read_datatap_inq_var(gp, gp->var_namelist[varid]);
2011
 
    }
2012
 
    else {
2013
 
        adios_error(err_invalid_varid, "Cannot find var %d\n", varid);
2014
 
        return NULL;
2015
 
    }
2016
 
}
2017
 
 
2018
 
void adios_read_datatap_free_varinfo (ADIOS_VARINFO *vp)
2019
 
{
2020
 
    if(!vp) return;
2021
 
 
2022
 
    if(!vp->ndim) { // scalar
2023
 
        if(vp->timedim != -1) { // scalar with time dimension
2024
 
            free(vp->dims);
2025
 
        }
2026
 
        free(vp->value);
2027
 
    }
2028
 
    else { // arrays
2029
 
        free(vp->dims);
2030
 
    }
2031
 
}
2032
 
 
2033
 
int64_t adios_read_datatap_read_var (ADIOS_GROUP *gp, const char *varname,
2034
 
                                     const uint64_t *start, const uint64_t *count,
2035
 
                                     void *data)
2036
 
{
2037
 
fprintf(stderr, "im here read var %s addr %p %s:%d\n", varname, data,__FILE__,__LINE__);
2038
 
    int64_t total_size;
2039
 
    datatap_read_file_data *ds = (datatap_read_file_data *) gp->fp->fh;
2040
 
    int found = 0;
2041
 
 
2042
 
fprintf(stderr, "im here rank %d %s %s:%d\n", ds->my_rank,varname, __FILE__,__LINE__);
2043
 
    datatap_var_info *current_var = ds->vars;
2044
 
    while(current_var) {
2045
 
fprintf(stderr, "im here rank %d %s %s %s %s:%d\n", ds->my_rank, varname, current_var->varname,current_var->varpath,__FILE__,__LINE__);
2046
 
        if(!compare_var_name(varname, current_var)) {
2047
 
fprintf(stderr, "im here rank %d %s %s %s %s:%d\n", ds->my_rank, varname, current_var->varname,current_var->varpath,__FILE__,__LINE__);
2048
 
            // found it locally
2049
 
            found = 1;
2050
 
            if(!current_var->ndims) { // scalar
2051
 
fprintf(stderr, "im here rank %d %s %s %s %s:%d\n", ds->my_rank, varname, current_var->varname,current_var->varpath,__FILE__,__LINE__);
2052
 
                // TODO: check time dimension if there is any
2053
 
                if(current_var->time_dim != -1 &&
2054
 
                    (gp->fp->tidx_start != start[0] || count[0] > 1)) {
2055
 
                // TODO: check time dimension if there is any
2056
 
                    adios_error(err_no_data_at_timestep, "Specified time step is not available.");
2057
 
                    return -1;
2058
 
                }
2059
 
 
2060
 
                total_size = current_var->data_size;
2061
 
                memcpy(data, current_var->chunks->data, total_size);
2062
 
                return total_size;
2063
 
            }
2064
 
            else { // arrays
2065
 
fprintf(stderr, "im here rank %d %s:%d\n", ds->my_rank, __FILE__,__LINE__);
2066
 
                // TODO: check time dimension if there is any
2067
 
                if(current_var->time_dim != -1) {
2068
 
                    uint64_t ti = current_var->time_dim;
2069
 
                    if(futils_is_called_from_fortran()) {
2070
 
                        ti --;
2071
 
                    }
2072
 
                    // TODO: in Fortran index starts from 1 but in C index starts from 0
2073
 
                    if(count[ti] > 1 || start[ti] != current_var->chunks->global_offsets[ti]) {
2074
 
                        adios_error(err_no_data_at_timestep, "Specified time step is not available.");
2075
 
                        return -1;
2076
 
                    }
2077
 
                }
2078
 
 
2079
 
fprintf(stderr, "im here rank %d %s:%d\n", ds->my_rank, __FILE__,__LINE__);
2080
 
                total_size = read_array(ds, current_var, start, count, data);
2081
 
fprintf(stderr, "im here rank %d %s:%d\n", ds->my_rank, __FILE__,__LINE__);
2082
 
 
2083
 
                // rank 0 should be responsible for send data out to peer readers
2084
 
                if(ds->comm_size > 1) {
2085
 
                    if(ds->my_rank == 0) {
2086
 
                        // check if this array is missing on peer readers  
2087
 
                        //if(current_var->num_chunks == 1) {
2088
 
                        int byte_pos = current_var->id / 8;
2089
 
                        int bit_pos = current_var->id % 8;
2090
 
                        unsigned char mask = 0x01 << bit_pos;
2091
 
                        //if(current_var->num_chunks == 1) {
2092
 
                        if((ds->var_bitmap[byte_pos] & mask) != 0x00) {
2093
 
fprintf(stderr, "im here rank %d %s:%d\n", ds->my_rank, __FILE__,__LINE__);
2094
 
                            int rc = MPI_Bcast(data, total_size, MPI_BYTE, 0, ds->comm); 
2095
 
                            if(rc != MPI_SUCCESS) {
2096
 
                                fprintf(stderr, "rank %d: MPI_Bcast() returns error (%d). %s:%d\n",
2097
 
                                    ds->my_rank, rc, __FILE__, __LINE__);
2098
 
                                return -1;
2099
 
                            }
2100
 
                        }
2101
 
                    } 
2102
 
                }
2103
 
                return total_size;
2104
 
            }
2105
 
        }
2106
 
        else {
2107
 
            current_var = current_var->next;
2108
 
fprintf(stderr, "im here rank %d %s:%d\n", ds->my_rank, __FILE__,__LINE__);
2109
 
        }
2110
 
    }
2111
 
fprintf(stderr, "im here rank %d %s:%d\n", ds->my_rank, __FILE__,__LINE__);
2112
 
 
2113
 
    if(!found) {
2114
 
        current_var = ds->vars_peer;
2115
 
        while(current_var) {
2116
 
            if(!compare_var_name(varname, current_var)) {
2117
 
                // found it on remote peer reader
2118
 
                if(!current_var->ndims) { // scalar
2119
 
                    // TODO: check time dimension if there is any
2120
 
                    if(current_var->time_dim != -1 &&
2121
 
                        (gp->fp->tidx_start != start[0] || count[0] > 1)) {
2122
 
                        adios_error(err_no_data_at_timestep, "Specified time step is not available.");
2123
 
                        return -1;
2124
 
                    }
2125
 
 
2126
 
                    total_size = current_var->data_size;
2127
 
                    memcpy(data, current_var->chunks->data, total_size);
2128
 
                    return total_size;
2129
 
                }
2130
 
                else { // arrays
2131
 
                    // TODO: check time dimension if there is any
2132
 
                    if(current_var->time_dim != -1) {
2133
 
                        uint64_t ti = current_var->time_dim;
2134
 
                        // TODO: in Fortran index starts from 1 but in C index starts from 0
2135
 
                        if(futils_is_called_from_fortran()) {
2136
 
                            ti --;
2137
 
                        }
2138
 
 
2139
 
                        if(count[ti] > 1 || start[ti] != current_var->chunks->global_offsets[ti]) {
2140
 
                            adios_error(err_no_data_at_timestep, "Specified time step is not available.");
2141
 
                            return -1;
2142
 
                        }
2143
 
                    }
2144
 
 
2145
 
                    if(ds->my_rank != 0) {
2146
 
                        int i = 0;
2147
 
                        total_size = common_read_type_size(current_var->type, NULL); 
2148
 
                        for(; i < current_var->ndims; i ++) {
2149
 
                             total_size *= count[i];
2150
 
                        }
2151
 
                                     
2152
 
                        if(ds->comm_size > 1) {
2153
 
fprintf(stderr, "im here rank %d %s %ld %s:%d\n", ds->my_rank, current_var->varname,total_size, __FILE__,__LINE__);
2154
 
                            // check if this array is missing on peer readers
2155
 
                            int rc = MPI_Bcast(data, total_size, MPI_BYTE, 0, ds->comm);
2156
 
fprintf(stderr, "im here rank %d %s %ld %s:%d\n", ds->my_rank, current_var->varname,total_size, __FILE__,__LINE__);
2157
 
                            if(rc != MPI_SUCCESS) {
2158
 
                                fprintf(stderr, "rank %d: MPI_Bcast() returns error (%d). %s:%d\n",
2159
 
                                    ds->my_rank, rc, __FILE__, __LINE__);
2160
 
                                return -1;
2161
 
                            }
2162
 
fprintf(stderr, "im here rank %d %s %ld %s:%d\n", ds->my_rank, current_var->varname,total_size, __FILE__,__LINE__);
2163
 
                        }
2164
 
                    }
2165
 
 
2166
 
fprintf(stderr, "im here read rank %d var %s addr %p total size %ld %s:%d\n", ds->my_rank, varname, data,total_size,__FILE__,__LINE__);
2167
 
                    return total_size;
2168
 
                }
2169
 
            }
2170
 
            else {
2171
 
                current_var = current_var->next;
2172
 
            }
2173
 
        }
2174
 
    }
2175
 
 
2176
 
fprintf(stderr, "im here rank %d %s:%d\n", ds->my_rank, __FILE__,__LINE__);
2177
 
    adios_error(err_invalid_varname, "Cannot find var %s\n", varname);
2178
 
    return -1;
2179
 
 
2180
 
 
2181
 
 
2182
 
#if 0
2183
 
    // TODO: search through all pgs to find this var
2184
 
    int p;
2185
 
    for(p = 0; p < ds->num_pgs; p ++) {
2186
 
        datatap_pg_info *current_pg = &(ds->pgs[p]);    
2187
 
        datatap_var_info *var_info = NULL;
2188
 
        int v;
2189
 
        for(v = 0; v < current_pg->num_vars; v ++) {
2190
 
            if(!compare_var_name(varname, &(current_pg->vars[v]))) {     
2191
 
                // found it
2192
 
                found = 1;
2193
 
 
2194
 
                if(!current_pg->vars[v].ndims) { // scalar
2195
 
                    // TODO: check time dimension if there is any
2196
 
                    if(current_pg->vars[v].time_dim != -1 && 
2197
 
                        (gp->fp->tidx_start != start[0] || count[0] > 1)) {
2198
 
                        adios_error(err_no_data_at_timestep, "Specified time step is not available.");
2199
 
                        return -1;
2200
 
                    }
2201
 
  
2202
 
                    total_size = common_read_type_size(current_pg->vars[v].type, current_pg->vars[v].data);                
2203
 
                    memcpy(data, current_pg->vars[v].data, total_size);
2204
 
                    return total_size;
2205
 
                }
2206
 
                else { // arrays
2207
 
                    // TODO: check time dimension if there is any
2208
 
                    if(current_pg->vars[v].time_dim != -1) {
2209
 
                        // TODO: in Fortran index starts from 1 but in C index starts from 0 
2210
 
                        uint64_t ti = current_pg->vars[v].time_dim;
2211
 
                        if(count[ti] > 1 || start[ti] != current_pg->vars[v].global_offsets[ti]) {
2212
 
                            adios_error(err_no_data_at_timestep, "Specified time step is not available.");
2213
 
                            return -1;
2214
 
                        } 
2215
 
                    }
2216
 
 
2217
 
                    // Datatap batches per-variable reads, so here we only record the buffer address
2218
 
                    datatap_var_info *var_info = (datatap_var_info *) malloc(sizeof(datatap_var_info));
2219
 
                    if(!var_info) {
2220
 
                        adios_error(err_no_memory, "Could not allocate memory for group info");
2221
 
                        return -1;
2222
 
                    }                
2223
 
                    memcpy(var_info, &(current_pg->vars[v]), sizeof(datatap_var_info));
2224
 
                    var_info->local_bounds = (uint64_t *) malloc(var_info->ndims * sizeof(uint64_t));
2225
 
                    if(!var_info->local_bounds) {
2226
 
                        adios_error(err_no_memory, "Cannot allocate memory for Datatap.");
2227
 
                        return NULL;
2228
 
                    }
2229
 
                    var_info->global_bounds = (uint64_t *) malloc(var_info->ndims * sizeof(uint64_t));
2230
 
                    if(!var_info->global_bounds) {
2231
 
                        adios_error(err_no_memory, "Cannot allocate memory for Datatap.");
2232
 
                        return NULL;
2233
 
                    }
2234
 
                    var_info->global_offsets = (uint64_t *) malloc(var_info->ndims * sizeof(uint64_t));
2235
 
                    if(!var_info->global_offsets) {
2236
 
                        adios_error(err_no_memory, "Cannot allocate memory for Datatap.");
2237
 
                        return NULL;
2238
 
                    }
2239
 
 
2240
 
                    total_size = 1;
2241
 
                    int i;
2242
 
                    for(i = 0; i < var_info->ndims; i ++) {
2243
 
                        var_info->local_bounds[i] = count[i];
2244
 
                        // TODO: it seems that user won't read from N-dimensional array into M-dimension
2245
 
                        var_info->global_bounds[i] = current_pg->vars[v].global_bounds[i];  
2246
 
                        var_info->global_offsets[i] = start[i];
2247
 
                        total_size = total_size * count[i];
2248
 
                    }
2249
 
                    total_size *= common_read_type_size(var_info->type, NULL);
2250
 
                    var_info->data = data;
2251
 
                    var_info->data_size = total_size;
2252
 
                    
2253
 
                    // now we add this to the list of vars to read
2254
 
                    if(!ds->vars_read) {
2255
 
                        ds->vars_read = var_info;                
2256
 
                        ds->vars_read_tail = var_info;
2257
 
                        var_info->next = NULL;                   
2258
 
                    }
2259
 
                    else {
2260
 
                        ds->vars_read_tail->next = var_info;     
2261
 
                        ds->vars_read_tail = var_info;
2262
 
                        var_info->next = NULL; 
2263
 
                    }
2264
 
                    ds->num_vars_read ++;                  
2265
 
                    return total_size;
2266
 
                }            
2267
 
            }   
2268
 
        }    
2269
 
    }
2270
 
fprintf(stderr, "im here read var %s addr %p %s:%d\n", varname, data,__FILE__,__LINE__);
2271
 
 
2272
 
    if(found) {
2273
 
        return total_size;
2274
 
    }
2275
 
 
2276
 
    adios_error(err_invalid_varname, "Cannot find var %s\n", varname);
2277
 
    return -1;    
2278
 
#endif 
2279
 
 
2280
 
}
2281
 
 
2282
 
int64_t adios_read_datatap_read_var_byid (ADIOS_GROUP *gp, int varid,
2283
 
                                          const uint64_t *start,
2284
 
                                          const uint64_t *count,
2285
 
                                          void *data)
2286
 
{
2287
 
    if(varid >= 0 && varid < gp->vars_count) {
2288
 
fprintf(stderr, "im here %s:%d\n",__FILE__,__LINE__);
2289
 
        return adios_read_datatap_read_var (gp, gp->var_namelist[varid], start, count, data);
2290
 
    }
2291
 
    else {
2292
 
        adios_error(err_invalid_varid, "Cannot find var %d\n", varid);
2293
 
        return -1;
2294
 
    }
2295
 
 
2296
 
}
2297
 
 
2298
 
void adios_read_datatap_reset_dimension_order (ADIOS_FILE *fp, int is_fortran)
2299
 
{
2300
 
    // TODO
2301
 
    adios_error(err_invalid_read_method, "adios_read_datatap_reset_dimension_order is not implemented.");
2302
 
}
2303
 
#endif
2304