~ubuntu-branches/ubuntu/vivid/adios/vivid-proposed

« back to all changes in this revision

Viewing changes to src/write/adios_dimes.c

  • Committer: Package Import Robot
  • Author(s): Alastair McKinstry
  • Date: 2014-06-16 23:06:38 UTC
  • mfrom: (15.1.8 sid)
  • Revision ID: package-import@ubuntu.com-20140616230638-cxryhot6b8ge32l6
Tags: 1.7.0-1
* New upstream release.
* Add adios.pc pkgconfig file. adios_config now uses this.

Show diffs side-by-side

added added

removed removed

Lines of Context:
32
32
#define MAX_NUM_OF_FILES 20
33
33
static char ds_var_name[MAX_DS_NAMELEN];
34
34
static unsigned int adios_dimes_verbose = 3;
 
35
static int check_read_status = 2; // 0: disable, 1: at every step (not supported yet), 2: at finalize (default value)
 
36
static double check_read_status_timeout_sec = 1;
 
37
static int check_read_status_poll_interval_ms = 100;
35
38
 
36
39
struct adios_dimes_file_info
37
40
{
38
41
    char *name;
39
 
    int time_index;
 
42
    int time_index; // versioning, start from 0
40
43
};
41
44
 
42
45
struct adios_dimes_data_struct
44
47
    int rank;   // dataspaces rank or MPI rank if MPI is available
45
48
    int peers;  // from xml parameter or group communicator
46
49
    int appid;  // from xml parameter or 1
47
 
    int time_index; // versioning in DataSpaces, start from 0
48
50
    int n_writes; // how many times adios_write has been called
49
51
#if HAVE_MPI
50
 
    MPI_Comm mpi_comm;
 
52
    MPI_Comm mpi_comm; // for use in open..close
 
53
    MPI_Comm mpi_comm_init; // for use in init/finalize
51
54
#endif
52
55
    int  num_of_files; // how many files do we have with this method
53
56
    char *fnames[MAX_NUM_OF_FILES];  // names of files (needed at finalize)
54
57
    int  fversions[MAX_NUM_OF_FILES];   // last steps of files (needed at finalize)
 
58
    int  mpi_ranks[MAX_NUM_OF_FILES];   // mpi rank of current process for each written file (needed at finalize)
55
59
    struct adios_dimes_file_info file_info[MAX_NUM_OF_FILES]; // keep track of time index for each opened file
56
60
};
57
61
 
58
 
static int init_dimes_file_info(struct adios_dimes_data_struct *p)
59
 
{
60
 
    int i;
61
 
    for (i = 0; i < MAX_NUM_OF_FILES; i++) {
62
 
        p->file_info[i].name = NULL;
63
 
        p->file_info[i].time_index = 0;
64
 
    }
65
 
}
66
 
 
67
 
static void free_dimes_file_info(struct adios_dimes_data_struct *p)
68
 
{
69
 
    int i;
70
 
    for (i = 0; i < MAX_NUM_OF_FILES; i++) {
71
 
        if (p->file_info[i].name) {
72
 
            free(p->file_info[i].name);
 
62
 
 
63
static int check_read_status_var(const char* fname, int last_version)
 
64
{
 
65
    int stay_in_poll_loop = 1;
 
66
    double t1 = adios_gettime();
 
67
 
 
68
    uint64_t lb[MAX_DS_NDIM], ub[MAX_DS_NDIM], gdims[MAX_DS_NDIM];
 
69
    int elemsize, ndim;
 
70
    int read_status_buf[1] = {-1};
 
71
    int read_status_buf_len = 1;
 
72
 
 
73
    while (stay_in_poll_loop) {
 
74
        snprintf(ds_var_name, MAX_DS_NAMELEN, "READ_STATUS@%s", fname);
 
75
        elemsize = sizeof(int); ndim = 1;
 
76
        lb[0] = 0; ub[0] = read_status_buf_len-1;
 
77
        gdims[0] = (ub[0]-lb[0]+1) * dspaces_get_num_space_server();
 
78
        dspaces_define_gdim(ds_var_name, ndim, gdims);
 
79
        int err = dspaces_get(ds_var_name, 0, elemsize, ndim, lb, ub, read_status_buf);
 
80
        if (!err) {
 
81
            int version = read_status_buf[0];
 
82
            log_debug("%s: ds_var_name %s read_status_buf = {%d}\n",
 
83
                __func__, ds_var_name, version);
 
84
            if (version == last_version) {
 
85
                stay_in_poll_loop = 0;
 
86
            }            
 
87
        } else {
 
88
            log_error("%s: failed to read ds_var_name %s from space\n",
 
89
                __func__, ds_var_name); 
 
90
        }
 
91
 
 
92
        // check if we need to stay in loop
 
93
        if (stay_in_poll_loop) {
 
94
            double elapsed_time = adios_gettime() - t1;
 
95
            if (check_read_status_timeout_sec >= 0.0 &&
 
96
                elapsed_time > check_read_status_timeout_sec) {
 
97
                stay_in_poll_loop = 0;
 
98
            } else {
 
99
                adios_nanosleep(check_read_status_poll_interval_ms/1000,
 
100
                    (int)(((uint64_t)check_read_status_poll_interval_ms * 1000000L)%1000000000L));      
 
101
            }
 
102
        }
 
103
    }
 
104
 
 
105
    return 0;
 
106
}
 
107
 
 
108
static int init_dimes_file_info(struct adios_dimes_data_struct *md)
 
109
{
 
110
    int i;
 
111
    for (i = 0; i < MAX_NUM_OF_FILES; i++) {
 
112
        md->file_info[i].name = NULL;
 
113
        md->file_info[i].time_index = 0;
 
114
    }
 
115
}
 
116
 
 
117
static void free_dimes_file_info(struct adios_dimes_data_struct *md)
 
118
{
 
119
    int i;
 
120
    for (i = 0; i < MAX_NUM_OF_FILES; i++) {
 
121
        if (md->file_info[i].name) {
 
122
            free(md->file_info[i].name);
73
123
        }
74
124
    }
75
125
 
76
126
    return;
77
127
}
78
128
 
79
 
static struct adios_dimes_file_info* lookup_dimes_file_info(struct adios_dimes_data_struct *p, const char* fname)
 
129
static struct adios_dimes_file_info* lookup_dimes_file_info(struct adios_dimes_data_struct *md, const char* fname)
80
130
{
81
131
    int i;
82
132
    for (i = 0; i < MAX_NUM_OF_FILES; i++) {
83
 
        if (p->file_info[i].name != NULL &&
84
 
            strcmp(p->file_info[i].name, fname) == 0) {
85
 
            return &p->file_info[i];
 
133
        if (md->file_info[i].name != NULL &&
 
134
            strcmp(md->file_info[i].name, fname) == 0) {
 
135
            return &md->file_info[i];
86
136
        }
87
137
    }
88
138
 
89
139
    for (i = 0; i < MAX_NUM_OF_FILES; i++) {
90
 
        if (p->file_info[i].name == NULL) {
91
 
            p->file_info[i].name = malloc(strlen(fname)+1);
92
 
            strcpy(p->file_info[i].name, fname);
93
 
            return &p->file_info[i];
 
140
        if (md->file_info[i].name == NULL) {
 
141
            md->file_info[i].name = malloc(strlen(fname)+1);
 
142
            strcpy(md->file_info[i].name, fname);
 
143
            return &md->file_info[i];
94
144
        }
95
145
    }
96
146
 
97
147
    return NULL;
98
148
}
99
149
 
100
 
static int connect_to_dimes (struct adios_dimes_data_struct * p, MPI_Comm comm)
 
150
static int connect_to_dimes (struct adios_dimes_data_struct *md, MPI_Comm comm)
101
151
{
102
152
    int ret = 0;
103
153
    int num_peers;
104
154
 
105
155
    if (!globals_adios_is_dimes_connected()) {
106
156
 
107
 
        MPI_Comm_rank (comm, &(p->rank));
 
157
        MPI_Comm_rank (comm, &(md->rank));
108
158
        MPI_Comm_size (comm, &num_peers);
109
159
 
110
160
        // Application ID should be set by the application calling adios_set_application_id()
111
161
        int was_set;
112
 
        p->appid = globals_adios_get_application_id (&was_set);
 
162
        md->appid = globals_adios_get_application_id (&was_set);
113
163
        if (!was_set)
114
 
            p->appid = 1;
 
164
            md->appid = 1;
115
165
 
116
166
        log_debug ("adios_dimes: rank=%d connect to DATASPACES, peers=%d, appid=%d \n",
117
 
                p->rank, num_peers, p->appid);
 
167
                md->rank, num_peers, md->appid);
118
168
 
119
169
        //Init the dart client
120
 
        ret = dspaces_init (num_peers, p->appid);
 
170
        ret = dspaces_init (num_peers, md->appid, &md->mpi_comm_init, NULL);
121
171
        if (ret) {
122
 
            log_error ("adios_dimes: rank=%d Failed to connect to DATASPACES: err=%d,  rank=%d\n", p->rank, ret);        
 
172
            log_error ("adios_dimes: rank=%d Failed to connect to DATASPACES: err=%d,  rank=%d\n", md->rank, ret);        
123
173
            return ret;
124
174
        }
125
175
 
126
176
#if ! HAVE_MPI
127
 
        dspaces_rank (&(p->rank));
128
 
        dspaces_peers (&(p->peers));
 
177
        dspaces_rank (&(md->rank));
 
178
        dspaces_peers (&(md->peers));
129
179
#endif
130
180
 
131
 
        log_debug ("adios_dimes: rank=%d connected to DATASPACES: peers=%d\n", p->rank, p->peers);        
 
181
        log_debug ("adios_dimes: rank=%d connected to DATASPACES: peers=%d\n", md->rank, md->peers);        
132
182
    }
133
183
 
134
184
    globals_adios_set_dimes_connected_from_writer();
140
190
                     struct adios_method_struct * method
141
191
                     )
142
192
{
143
 
    struct adios_dimes_data_struct *p = 0;
 
193
    struct adios_dimes_data_struct *md = 0;
144
194
    if (!adios_dimes_initialized)
145
195
    {
146
196
        adios_dimes_initialized = 1;
147
197
    }
148
198
   
149
199
    method->method_data = calloc (1, sizeof (struct adios_dimes_data_struct));
150
 
    p = (struct adios_dimes_data_struct*)method->method_data;
151
 
    
 
200
    md = (struct adios_dimes_data_struct*)method->method_data;
 
201
   
 
202
    int check_read; 
152
203
    int index, i;
153
204
    char temp[64];
154
205
 
155
206
    //Init the static data structure
156
 
    p->peers = 1;
157
 
    p->appid = -1;
158
 
    p->time_index = 0;
159
 
    p->n_writes = 0;
 
207
    md->peers = 1;
 
208
    md->appid = -1;
 
209
    md->n_writes = 0;
160
210
#if HAVE_MPI
161
 
    p->mpi_comm = MPI_COMM_NULL;
 
211
    md->mpi_comm = MPI_COMM_NULL;
 
212
    md->mpi_comm_init = method->init_comm;
162
213
#endif
163
 
    p->num_of_files = 0;
 
214
    md->num_of_files = 0;
164
215
 
165
 
    init_dimes_file_info(p);
166
 
    connect_to_dimes (p, method->init_comm);
 
216
    // process user parameters
 
217
    const PairStruct *p = parameters;
 
218
    while (p) {
 
219
        if (!strcasecmp (p->name, "app_id")) {
 
220
            errno = 0;
 
221
            md->appid = strtol(p->value, NULL, 10);
 
222
            if (md->appid > 0 && !errno) {
 
223
                log_debug ("App ID parameter set to %d for DIMES write method\n",
 
224
                            md->appid);
 
225
                globals_adios_set_application_id (md->appid);
 
226
            } else {
 
227
                log_error ("Invalid 'app_id' parameter given to the DIMES write "
 
228
                           "method: '%s'\n", p->value);
 
229
            }
 
230
        } else if (!strcasecmp(p->name, "check_read_status")) {
 
231
            errno = 0;
 
232
            check_read = strtol(p->value, NULL, 10);
 
233
            if (!errno && (check_read == 0 || check_read == 2)) {
 
234
                check_read_status = check_read;
 
235
                log_debug("check_read_status set to %d for DIMES write method\n", 
 
236
                    check_read_status);
 
237
            } else {
 
238
                log_error("Invalid 'check_read_status' parameter given to the DIMES "
 
239
                            "write method: '%s'\n", p->value);
 
240
                log_error("check_read_status=<value>, 0: disable, 1: at every step "
 
241
                            " (not supported yet), 2: at finalize (default value).\n");
 
242
            }
 
243
        } else if (!strcasecmp(p->name, "check_read_status_timeout_sec")) {
 
244
            errno = 0;
 
245
            double timeout = strtof(p->value, NULL);
 
246
            if (timeout > 0.0 && !errno) {
 
247
                log_debug("check_read_status_timeout_sec set to %f seconds for DIMES write method\n", timeout);
 
248
                check_read_status_timeout_sec = timeout;
 
249
            } else {
 
250
                log_error("Invalid 'check_read_status_timeout_sec' parameter given to the DIMES "
 
251
                        "write method: '%s'\n", p->value);
 
252
            }   
 
253
        } else if (!strcasecmp(p->name, "check_read_status_poll_interval")) {
 
254
            errno = 0;
 
255
            int pollinterval = strtol(p->value, NULL, 10);
 
256
            if (pollinterval > 0 && !errno) {
 
257
                log_debug("check_read_status_poll_interval set to %d milliseconds for DIMES write method\n", pollinterval);
 
258
                check_read_status_poll_interval_ms = pollinterval;
 
259
            } else {
 
260
                log_error("Invalid 'check_read_status_poll_interval' parameter given to DIMES "
 
261
                    "write method: '%s'\n", p->value);
 
262
            }
 
263
        } else {
 
264
            log_error ("Parameter name %s is not recognized by the DIMES write "
 
265
                        "method\n", p->name);
 
266
        }
 
267
        p = p->next;
 
268
    }
 
269
    init_dimes_file_info(md);
 
270
    connect_to_dimes (md, method->init_comm);
167
271
 
168
272
    log_info ("adios_dimes_init: done\n");
169
273
   
177
281
                    )
178
282
{
179
283
    int ret = 0;
180
 
    struct adios_dimes_data_struct *p = (struct adios_dimes_data_struct *)
 
284
    struct adios_dimes_data_struct *md = (struct adios_dimes_data_struct *)
181
285
                                                method->method_data;
182
 
    struct adios_dimes_file_info *info = lookup_dimes_file_info(p, fd->name);
 
286
    struct adios_dimes_file_info *info = lookup_dimes_file_info(md, fd->name);
183
287
    log_info ("adios_dimes_open: open %s, mode=%d, time_index=%d \n",
184
288
                        fd->name, fd->mode, info->time_index);
185
289
 
186
290
#if HAVE_MPI
187
291
    // if we have MPI and a communicator, we can get the exact size of this application
188
292
    // that we need to tell DATASPACES
189
 
    p->mpi_comm = comm;
190
 
    MPI_Comm_rank (p->mpi_comm, &(p->rank));
191
 
    MPI_Comm_size (p->mpi_comm, &(p->peers));
 
293
    md->mpi_comm = comm;
 
294
    MPI_Comm_rank (md->mpi_comm, &(md->rank));
 
295
    MPI_Comm_size (md->mpi_comm, &(md->peers));
192
296
#endif
193
297
 
194
298
    if (fd->mode == adios_mode_write || fd->mode == adios_mode_append)
195
299
    {
196
 
        log_debug ("adios_dimes_open: rank=%d call write lock...\n", p->rank);       
197
 
        dspaces_lock_on_write (fd->name, &p->mpi_comm);  
198
 
        log_debug ("adios_dimes_open: rank=%d got write lock\n", p->rank);        
 
300
        log_debug ("adios_dimes_open: rank=%d call write lock...\n", md->rank);       
 
301
        dspaces_lock_on_write (fd->name, &md->mpi_comm);  
 
302
        log_debug ("adios_dimes_open: rank=%d got write lock\n", md->rank);        
199
303
        // Free data objects written in the previous steps
200
304
        dimes_put_sync_group(fd->name, info->time_index);
201
305
        dimes_put_set_group(fd->name, info->time_index);    
202
306
    }
203
307
    else if (fd->mode == adios_mode_read)
204
308
    {
205
 
        dspaces_lock_on_read (fd->name, &p->mpi_comm);
 
309
        dspaces_lock_on_read (fd->name, &md->mpi_comm);
206
310
    } 
207
311
  
208
312
    return ret;
233
337
                      ,struct adios_method_struct * method
234
338
                      )
235
339
{
236
 
    struct adios_dimes_data_struct *p = (struct adios_dimes_data_struct *)
 
340
    struct adios_dimes_data_struct *md = (struct adios_dimes_data_struct *)
237
341
                                                            method->method_data;
238
342
    struct adios_group_struct *group = fd->group;
239
 
    struct adios_dimes_file_info *info = lookup_dimes_file_info(p, fd->name);
 
343
    struct adios_dimes_file_info *info = lookup_dimes_file_info(md, fd->name);
240
344
    //Get var size
241
345
    //  FIXME: type size of a string >2GB does not fit to int. 
242
346
    //  adios_get_type_size returns uint64_t but dspaces_put handles only int
246
350
    char * var_name = v->name;
247
351
    int err;
248
352
 
 
353
    char lb_str[256], ub_str[256], gdims_str[256], dims_str[256], didx_str[256];
249
354
    //Get two offset coordinate values
250
355
    unsigned int version;
251
 
 
252
 
    int dims[3]={1,1,1}, gdims[3]={0,0,0}, lb[3]={0,0,0}, ub[3]={0,0,0}; /* lower and upper bounds for DataSpaces */
253
 
    int didx[3]; // for reordering the dimensions
 
356
    uint64_t dims[MAX_DS_NDIM], gdims[MAX_DS_NDIM], lb[MAX_DS_NDIM], ub[MAX_DS_NDIM]; /* lower and upper bounds for DataSpaces */
 
357
    int didx[MAX_DS_NDIM]; // for reordering the dimensions
254
358
    int ndims = 0;
255
359
    int hastime = 0;
 
360
    gdims[0] = 0;
256
361
    struct adios_dimension_struct* var_dimensions = v->dimensions;
257
362
    // Calculate lower and upper bounds for each available dimension (up to 3 dims)
258
 
    while( var_dimensions && ndims < 3)
 
363
    while( var_dimensions && ndims < MAX_DS_NDIM)
259
364
    {
260
365
        dims[ndims] = adios_get_dim_value (&(var_dimensions->dimension));
261
366
        gdims[ndims] = adios_get_dim_value (&(var_dimensions->global_dimension));
262
367
        lb[ndims] = adios_get_dim_value (&(var_dimensions->local_offset));
263
 
        if (dims[ndims] > 0)  {
 
368
        if (gdims[ndims] > 0 && dims[ndims] > 0)  {
264
369
            ub[ndims] = lb[ndims] + dims[ndims] - 1;
265
370
            ndims++;
266
371
        }   else {
287
392
    //snprintf(dspaces_type_var_name, MAX_DS_NAMELEN, "TYPE@%s", ds_var_name);
288
393
    
289
394
    /* non-global variables are put in space ONLY by rank = 0 process */
290
 
    if (gdims[0] == 0 && p->rank != 0) {
291
 
        //fprintf(stderr, "rank=%d var_name=%s is not global. Skip\n", p->rank, ds_var_name);
 
395
    if (gdims[0] == 0 && md->rank != 0) {
 
396
        //fprintf(stderr, "rank=%d var_name=%s is not global. Skip\n", md->rank, ds_var_name);
292
397
        return;
293
398
    }
294
399
 
301
406
    
302
407
     
303
408
    v->write_offset = 1; // only !=0 offsets will be included in build index
 
409
    /* This is not needed here, this is already called in common_adios_write() 
304
410
    adios_generate_var_characteristics_v1 (fd, v); // characteristics will be included in build index
305
411
    adios_write_var_characteristics_v1 (fd, v);
306
 
    
 
412
    */
307
413
 
308
 
    log_debug ("var_name=%s, type=%s(%d) elemsize=%d, version=%d, ndims=%d, size=(%d,%d,%d), gdim=(%d,%d,%d), lb=(%d,%d,%d), ub=(%d,%d,%d)\n",
 
414
    dimes_int64s_to_str(ndims, lb, lb_str);
 
415
    dimes_int64s_to_str(ndims, ub, ub_str);
 
416
    dimes_int64s_to_str(ndims, dims, dims_str);
 
417
    dimes_int64s_to_str(ndims, gdims, gdims_str);
 
418
    log_debug ("var_name=%s, type=%s(%d) elemsize=%d, version=%d, ndims=%d, size=(%s), gdim=(%s), lb=(%s), ub=(%s)\n",
309
419
            ds_var_name, adios_type_to_string_int(v->type), v->type, var_type_size, version, ndims,
310
 
            dims[0], dims[1], dims[2], gdims[0], gdims[1], gdims[2], lb[0], lb[1], lb[2], ub[0], ub[1], ub[2]);
 
420
            dims_str, gdims_str, lb_str, ub_str);    
311
421
 
312
422
    /* non-timed scalars are written in the metadata at close(), not here */
313
423
    if (ndims == 0 && !hastime)
320
430
            group->adios_host_language_fortran == adios_flag_yes, 
321
431
            0 /*pack*/, didx);
322
432
 
323
 
    dimes_put(ds_var_name, version, var_type_size, 
324
 
             lb[didx[0]], lb[didx[1]], lb[didx[2]], 
325
 
             ub[didx[0]], ub[didx[1]], ub[didx[2]], 
326
 
             data);
327
 
    
328
 
    log_debug ("var_name=%s, dimension ordering=(%d,%d,%d), gdims=(%d,%d,%d), lb=(%d,%d,%d), ub=(%d,%d,%d)\n",
329
 
            ds_var_name, 
330
 
            didx[0], didx[1], didx[2], 
331
 
            gdims[didx[0]], gdims[didx[1]], gdims[didx[2]], 
332
 
            lb[didx[0]], lb[didx[1]], lb[didx[2]], 
333
 
            ub[didx[0]], ub[didx[1]], ub[didx[2]]);
 
433
    uint64_t lb_in[MAX_DS_NDIM], ub_in[MAX_DS_NDIM], gdims_in[MAX_DS_NDIM];
 
434
    int i;
 
435
    for (i = 0; i < ndims; i++) {
 
436
        lb_in[i] = lb[didx[i]];
 
437
        ub_in[i] = ub[didx[i]];
 
438
        gdims_in[i] = gdims[didx[i]];
 
439
    }
 
440
    dimes_define_gdim(ds_var_name, ndims, gdims_in);
 
441
    dimes_put(ds_var_name, version, var_type_size, ndims, lb_in, ub_in, data);
 
442
 
 
443
    dimes_ints_to_str(ndims, didx, didx_str);
 
444
    dimes_int64s_to_str(ndims, gdims_in, gdims_str);
 
445
    dimes_int64s_to_str(ndims, lb_in, lb_str);
 
446
    dimes_int64s_to_str(ndims, ub_in, ub_str);
 
447
    log_debug ("var_name=%s, dimension ordering=(%s), gdims=(%s), lb=(%s), ub=(%s)\n",
 
448
            ds_var_name, didx_str, gdims_str, lb_str, ub_str);
334
449
}
335
450
 
336
451
void adios_dimes_get_write_buffer (struct adios_file_struct * fd
402
517
                     ,struct adios_method_struct * method
403
518
                     )
404
519
{
405
 
    struct adios_dimes_data_struct *p = (struct adios_dimes_data_struct *)
406
 
                                                            method->method_data;
407
 
    uint64_t var_type_size = adios_get_type_size(v->type, v->data);
408
 
 
409
 
    //Get var name
410
 
    char * var_name = v->name;
411
 
 
412
 
    //Get two offset coordinate values
413
 
    int version, offset1[3],offset2[3];
414
 
    int dim_size[3];
415
 
    memset(offset1, 0, 3*sizeof(int));
416
 
    memset(offset2, 0, 3*sizeof(int));
417
 
    memset(dim_size, 0, 3*sizeof(int));
418
 
 
419
 
    struct adios_dimes_file_info *info = lookup_dimes_file_info(p, fd->name);
420
 
    version = info->time_index;
421
520
}
422
521
 
423
522
/* Gather var/attr indices from all processes to rank 0 */
426
525
                               ,struct adios_index_struct_v1 * index
427
526
                               )
428
527
{
429
 
    struct adios_dimes_data_struct *p = (struct adios_dimes_data_struct *)
 
528
    struct adios_dimes_data_struct *md = (struct adios_dimes_data_struct *)
430
529
                                                method->method_data;
431
530
    struct adios_index_process_group_struct_v1 * new_pg_root = 0;
432
531
    struct adios_index_var_struct_v1 * new_vars_root = 0;
440
539
#if 0
441
540
#if HAVE_MPI
442
541
    // gather all on rank 0
443
 
    if (p->mpi_comm != MPI_COMM_NULL)
 
542
    if (md->mpi_comm != MPI_COMM_NULL)
444
543
    {                                
445
 
        if (p->rank == 0)           
 
544
        if (md->rank == 0)           
446
545
        {                            
447
 
            int * index_sizes = malloc (4 * p->peers);
448
 
            int * index_offsets = malloc (4 * p->peers);
 
546
            int * index_sizes = malloc (4 * md->peers);
 
547
            int * index_offsets = malloc (4 * md->peers);
449
548
            char * recv_buffer = 0;
450
549
            uint32_t size = 0;
451
550
            uint32_t total_size = 0;
454
553
 
455
554
            MPI_Gather (&size, 1, MPI_INT
456
555
                    ,index_sizes, 1, MPI_INT
457
 
                    ,0, p->mpi_comm
 
556
                    ,0, md->mpi_comm
458
557
                    );
459
558
 
460
 
            for (i = 0; i < p->peers; i++)
 
559
            for (i = 0; i < md->peers; i++)
461
560
            {
462
561
                index_offsets [i] = total_size;
463
562
                total_size += index_sizes [i];
467
566
 
468
567
            MPI_Gatherv (&size, 0, MPI_BYTE
469
568
                    ,recv_buffer, index_sizes, index_offsets
470
 
                    ,MPI_BYTE, 0, p->mpi_comm
 
569
                    ,MPI_BYTE, 0, md->mpi_comm
471
570
                    );
472
571
 
473
 
            for (i = 1; i < p->peers; i++)
 
572
            for (i = 1; i < md->peers; i++)
474
573
            {
475
574
                b.buff = recv_buffer + index_offsets [i];
476
575
                b.length = index_sizes [i];
505
604
 
506
605
            uint32_t tmp_buffer_size = (uint32_t) buffer_size;
507
606
            MPI_Gather (&tmp_buffer_size, 1, MPI_INT, 0, 0, MPI_INT
508
 
                    ,0, p->mpi_comm
 
607
                    ,0, md->mpi_comm
509
608
                    );
510
609
            MPI_Gatherv (buffer, buffer_size, MPI_BYTE
511
610
                    ,0, 0, 0, MPI_BYTE
512
 
                    ,0, p->mpi_comm
 
611
                    ,0, md->mpi_comm
513
612
                    );
514
613
            free (buffer);
515
614
        }
566
665
                                  ,char ** buffer, int *buffer_size, int *nvars, int *nattrs
567
666
                                  )
568
667
{
569
 
    struct adios_dimes_data_struct *p = (struct adios_dimes_data_struct *)
 
668
    struct adios_dimes_data_struct *md = (struct adios_dimes_data_struct *)
570
669
                                                method->method_data;
571
670
    struct adios_index_var_struct_v1 * v = index->vars_root;
572
671
    struct adios_index_attribute_struct_v1 * a = index->attrs_root;
573
672
    int size;
574
673
    int ndims; // whatever the type of v->characteristics->dims.count is, we write an int to buffer
575
674
    int hastime; // true if variable has time dimension
576
 
    uint64_t ldims[10], gdims[10]; // we can write only 3 dimensions, will drop time dim
 
675
    uint64_t ldims[MAX_DS_NDIM], gdims[MAX_DS_NDIM];
577
676
    *nvars = 0;
578
677
    *nattrs = 0;
579
 
    int didx[3]; // dimension ordering indices
 
678
    int didx[MAX_DS_NDIM]; // dimension ordering indices
580
679
 
581
680
    log_debug ("%s entered\n", __func__);
582
681
 
585
684
    while (v) {
586
685
        size += 4*sizeof(int) // name len, type, hastime, number of dims 
587
686
                + dimes_get_full_name_len (v->var_path, v->var_name) // full path
588
 
                + 3 * 8; // always write 3 dimensions in the index (even for scalars)
 
687
                + MAX_DS_NDIM * 8; // always write 3 dimensions in the index (even for scalars)
589
688
        if (v->characteristics->dims.count == 0) {
590
689
            // For scalars, we write the value into the index
591
690
            if (v->type != adios_string)
648
747
        //ndims = MAX(v->characteristics->dims.count,3); // convert whatever type to int
649
748
        //memcpy (b, &(v->characteristics->dims.count), sizeof(int)); // number of dimensions
650
749
        log_debug("Variable %s, total dims = %d\n", name, v->characteristics->dims.count);
651
 
        j = 0; // we can write only 3 dims, will drop the time dimension
 
750
        j = 0; // will drop the time dimension
652
751
        hastime = 0;
653
752
        for (i = 0; i<v->characteristics->dims.count; i++) {
654
753
            ldims[j] = v->characteristics->dims.dims[j*3];  // ith dimension 
663
762
            }
664
763
            j++;
665
764
        }
666
 
        for (i=j; i<3; i++) {
667
 
            // fill up dimensions up to 3rd dim
 
765
        for (i=j; i<MAX_DS_NDIM; i++) {
 
766
            // fill up dimensions up to MAX_DS_NDIM dim
668
767
            ldims[i] = 1;
669
768
            gdims[i] = 1;
670
769
        }
671
 
        ndims = (j < 3 ? j : 3); // we can have max 3 dimensions in DataSpaces
 
770
        ndims = (j < MAX_DS_NDIM ? j : MAX_DS_NDIM); // we can have max MAX_DS_NDIM dimensions in DataSpaces
672
771
        memcpy (b, &hastime, sizeof(int)); // has time dimension?
673
772
        log_debug("             has time = %d (%d)\n", hastime, *(int*)b);
674
773
        b += sizeof(int); 
678
777
        dimes_dimension_ordering(ndims, 
679
778
                fd->group->adios_host_language_fortran == adios_flag_yes, 
680
779
                0 /*pack*/, didx);
681
 
        for (i = 0; i < 3; i++) {
 
780
        for (i = 0; i < MAX_DS_NDIM; i++) {
682
781
            if (gdims[didx[i]]) { 
683
782
                // global variable
684
783
                memcpy (b, &(gdims[didx[i]]), 8);  // ith dimension 
792
891
                      ,struct adios_method_struct * method
793
892
                      )
794
893
{
795
 
    struct adios_dimes_data_struct *p = (struct adios_dimes_data_struct *)
 
894
    struct adios_dimes_data_struct *md = (struct adios_dimes_data_struct *)
796
895
                                                method->method_data;
797
896
    struct adios_index_struct_v1 * index = adios_alloc_index_v1(1);
798
897
    struct adios_attribute_struct * a = fd->group->attributes;
799
 
    struct adios_dimes_file_info *info = lookup_dimes_file_info(p, fd->name);
800
 
    int lb[3], ub[3], didx[3]; // for reordering DS dimensions
 
898
    struct adios_dimes_file_info *info = lookup_dimes_file_info(md, fd->name);
 
899
    uint64_t gdims[MAX_DS_NDIM], lb[MAX_DS_NDIM], ub[MAX_DS_NDIM];
 
900
    int didx[MAX_DS_NDIM]; // for reordering DS dimensions
 
901
    int elemsize, ndim;
801
902
    unsigned int version;
802
903
 
803
904
    if (fd->mode == adios_mode_write || fd->mode == adios_mode_append)
814
915
 
815
916
        // make sure all processes have finished putting data to the space 
816
917
        // before we put metadata from rank 0
817
 
        MPI_Barrier (p->mpi_comm); 
 
918
        MPI_Barrier (md->mpi_comm); 
818
919
 
819
 
        if (p->rank == 0) {
 
920
        if (md->rank == 0) {
820
921
 
821
922
            /* Write two adios specific variables with the name of the file and name of the group into the space */
822
923
            /* ADIOS Read API fopen() checks these variables to see if writing already happened */
836
937
            
837
938
            /* Put GROUP@fn/gn header into space */
838
939
            snprintf(ds_var_name, MAX_DS_NAMELEN, "GROUP@%s/%s", fd->name, fd->group->name);
839
 
            log_debug ("%s: put %s with buf len %d into space\n", __func__, ds_var_name, indexlen);
840
 
            ub[0] = indexlen-1; ub[1] = 0; ub[2] = 0;
841
 
            dimes_dimension_ordering(1, 0, 0, didx); // C ordering of 1D array into DS
842
 
            dspaces_put(ds_var_name, version, 1,    0, 0, 0, /* lb 0..2 */
843
 
                     ub[didx[0]], ub[didx[1]], ub[didx[2]],  indexbuf); 
 
940
            log_debug ("%s: put %s buflen=%d (bytes) into space\n", __func__, ds_var_name, indexlen);
 
941
            elemsize = 1; ndim = 1;
 
942
            lb[0] = 0; ub[0] = indexlen-1;
 
943
            gdims[0] = (ub[0]-lb[0]+1) * dspaces_get_num_space_server();
 
944
            dspaces_define_gdim(ds_var_name, ndim, gdims);
 
945
            dspaces_put(ds_var_name, version, elemsize, ndim, lb, ub, indexbuf);
844
946
            free (indexbuf);
845
947
 
846
948
            /* Create and put FILE@fn header into space */
849
951
            snprintf (ds_var_name, MAX_DS_NAMELEN, "FILE@%s", fd->name);
850
952
            dimes_pack_file_info (info->time_index, nvars, nattrs, indexlen,
851
953
                        fd->group->name, &file_info_buf, &file_info_buf_len);
852
 
            log_debug ("%s: put %s = buflen=%d time=%d nvars=%d nattr=%d index=%d name=%d:%s into space\n",
 
954
            log_debug ("%s: put %s buflen=%d (bytes) time=%d nvars=%d nattr=%d index=%d name=%d:%s into space\n",
853
955
                __func__, ds_var_name, 
854
956
                *(int*)file_info_buf, *(int*)(file_info_buf+4), 
855
957
                *(int*)(file_info_buf+8), *(int*)(file_info_buf+12),
856
958
                *(int*)(file_info_buf+16), *(int*)(file_info_buf+20),
857
959
                file_info_buf+24);
858
 
            /* Flip 1st and 2nd dimension for DataSpaces representation for a 1D array*/
859
 
            ub[0] = file_info_buf_len-1; ub[1] = 0; ub[2] = 0;
860
 
            dimes_dimension_ordering(1, 0, 0, didx); // C ordering of 1D array into DS
861
960
            dspaces_put_sync(); //wait on previous put to finish
862
 
            dspaces_put(ds_var_name, version, 1,    0, 0, 0, /* lb 0..2 */
863
 
                     ub[didx[0]], ub[didx[1]], ub[didx[2]], file_info_buf); 
 
961
            elemsize = 1; ndim = 1;
 
962
            lb[0] = 0; ub[0] = file_info_buf_len-1;
 
963
            gdims[0] = (ub[0]-lb[0]+1) * dspaces_get_num_space_server();
 
964
            dspaces_define_gdim(ds_var_name, ndim, gdims);
 
965
            dspaces_put(ds_var_name, version, elemsize, ndim, lb, ub, file_info_buf);
864
966
 
865
967
            /* Create and put VERSION@fn version info into space */
866
968
            int version_buf[2] = {version, 0}; /* last version put in space; not terminated */
867
969
            int version_buf_len = 2; 
868
970
            snprintf (ds_var_name, MAX_DS_NAMELEN, "VERSION@%s", fd->name);
869
 
            log_debug ("%s: put %s with buf = [%d,%d] (len=%d integers) into space\n", 
 
971
            log_debug ("%s: put %s buf= [%d,%d] buflen=%d (integers) into space\n", 
870
972
                       __func__, ds_var_name, version_buf[0], version_buf[1], version_buf_len);
871
 
            ub[0] = version_buf_len-1; ub[1] = 0; ub[2] = 0;
872
 
            dimes_dimension_ordering(1, 0, 0, didx); // C ordering of 1D array into DS
873
 
            dspaces_put_sync(); //wait on previous put to finish
874
 
            dspaces_put(ds_var_name, 0, sizeof(int),    0, 0, 0, /* lb 0..2 */
875
 
                     ub[didx[0]], ub[didx[1]], ub[didx[2]],  version_buf); 
876
 
            dspaces_put_sync(); //wait on previous put to finish
877
 
            
 
973
            dspaces_put_sync(); //wait on previous put to finish
 
974
            elemsize = sizeof(int); ndim = 1;
 
975
            lb[0] = 0; ub[0] = version_buf_len-1;
 
976
            gdims[0] = (ub[0]-lb[0]+1) * dspaces_get_num_space_server();
 
977
            dspaces_define_gdim(ds_var_name, ndim, gdims);
 
978
            dspaces_put(ds_var_name, 0, elemsize, ndim, lb, ub, version_buf);
 
979
            dspaces_put_sync(); //wait on previous put to finish
878
980
        }
879
981
 
880
982
        // remember this filename and its version for finalize
881
983
        int i;
882
 
        for (i=0; i<p->num_of_files; i++) {
883
 
            if (!strcmp(fd->name, p->fnames[i]))
 
984
        for (i=0; i<md->num_of_files; i++) {
 
985
            if (!strcmp(fd->name, md->fnames[i]))
884
986
                break;
885
987
        }
886
 
        if (i == p->num_of_files) {
887
 
            if (p->num_of_files < MAX_NUM_OF_FILES) {
888
 
                p->fnames[ p->num_of_files ] = strdup(fd->name);
889
 
                p->num_of_files++;
 
988
        if (i == md->num_of_files) {
 
989
            if (md->num_of_files < MAX_NUM_OF_FILES) {
 
990
                md->fnames[ md->num_of_files ] = strdup(fd->name);
 
991
                md->num_of_files++;
890
992
            } else {
891
993
                log_error ("%s: Max %d files can be written by one application "
892
994
                        "using the DATASPACES method\n",
893
995
                        __func__, MAX_NUM_OF_FILES);
894
996
            }
895
997
        }
896
 
        if (i < p->num_of_files) {
897
 
            p->fversions[i] = version;
 
998
        if (i < md->num_of_files) {
 
999
            md->fversions[i] = version;
 
1000
            md->mpi_ranks[i] = md->rank;
898
1001
        }
899
1002
 
900
1003
 
903
1006
        adios_free_index_v1 (index);
904
1007
 
905
1008
        // rank=0 may be in put_sync when others call unlock, which is a global op
906
 
        MPI_Barrier (p->mpi_comm); 
 
1009
        MPI_Barrier (md->mpi_comm); 
907
1010
        //log_debug("%s: call dspaces_put_sync()\n", __func__);
908
1011
        //dspaces_put_sync();
909
1012
        dimes_put_unset_group();
910
1013
        log_debug("%s: call dspaces_unlock_on_write(%s)\n", __func__, fd->name);
911
 
        dspaces_unlock_on_write(fd->name, &p->mpi_comm);
 
1014
        dspaces_unlock_on_write(fd->name, &md->mpi_comm);
912
1015
    }
913
1016
    else if( fd->mode == adios_mode_read )
914
1017
    {
915
 
        dspaces_unlock_on_read(fd->name, &p->mpi_comm);
 
1018
        dspaces_unlock_on_read(fd->name, &md->mpi_comm);
916
1019
    } 
917
1020
 
918
1021
    /* Increment the time index */
924
1027
 
925
1028
void adios_dimes_finalize (int mype, struct adios_method_struct * method)
926
1029
{
927
 
    struct adios_dimes_data_struct *p = (struct adios_dimes_data_struct *)
 
1030
    struct adios_dimes_data_struct *md = (struct adios_dimes_data_struct *)
928
1031
        method->method_data;
929
1032
    int i;
930
1033
    char ds_var_name[MAX_DS_NAMELEN];
931
 
    int lb[3] = {0,0,0}; 
932
 
    int ub[3] = {1,0,0}; // we put 2 integers to space, 
933
 
    int didx[3]; // for reordering DS dimensions
 
1034
    uint64_t gdims[MAX_DS_NDIM], lb[MAX_DS_NDIM], ub[MAX_DS_NDIM];
 
1035
    int elemsize, ndim;
934
1036
    int value[2] = {0, 1}; // integer to be written to space (terminated=1)
935
1037
 
936
 
    free_dimes_file_info(p);
 
1038
    free_dimes_file_info(md);
937
1039
 
938
1040
    // tell the readers which files are finalized
939
 
    dimes_dimension_ordering(1, 0, 0, didx); // C ordering of 1D array into DS
940
 
    for (i=0; i<p->num_of_files; i++) {
 
1041
    for (i=0; i<md->num_of_files; i++) {
941
1042
        /* Put VERSION@fn into space. Indicates that this file will not be extended anymore.  */
942
 
        log_debug("%s: call dspaces_lock_on_write(%s), rank=%d\n", __func__, p->fnames[i], mype);
943
 
        dspaces_lock_on_write(p->fnames[i], &p->mpi_comm); // lock is global operation in DataSpaces
944
 
        if (p->rank == 0) {
945
 
            value[0] = p->fversions[i];
946
 
            snprintf(ds_var_name, MAX_DS_NAMELEN, "VERSION@%s", p->fnames[i]);
 
1043
        if (md->mpi_ranks[i] == 0) {
 
1044
            if (check_read_status == 2) {
 
1045
                check_read_status_var(md->fnames[i], md->fversions[i]);
 
1046
            }
 
1047
            MPI_Comm mpi_comm = MPI_COMM_SELF;
 
1048
            log_debug("%s: call dspaces_lock_on_write(%s), rank=%d\n", __func__, md->fnames[i], mype);
 
1049
            dspaces_lock_on_write(md->fnames[i], &mpi_comm); // lock is global operation in DataSpaces
 
1050
 
 
1051
            value[0] = md->fversions[i];
 
1052
            snprintf(ds_var_name, MAX_DS_NAMELEN, "VERSION@%s", md->fnames[i]);
947
1053
            log_debug ("%s: update %s in the space [%d, %d]\n", 
948
1054
                    __func__, ds_var_name, value[0], value[1] );
949
 
            dspaces_put(ds_var_name, 0, sizeof(int),   
950
 
                    lb[didx[0]], lb[didx[1]], lb[didx[2]], 
951
 
                    ub[didx[0]], ub[didx[1]], ub[didx[2]],  
952
 
                    &value); 
 
1055
            elemsize = sizeof(int); ndim = 1;
 
1056
            lb[0] = 0; ub[0] = 1;
 
1057
            gdims[0] = (ub[0]-lb[0]+1) * dspaces_get_num_space_server();
 
1058
            dspaces_define_gdim(ds_var_name, ndim, gdims);
 
1059
            dspaces_put(ds_var_name, 0, elemsize, ndim, lb, ub, &value);
953
1060
            log_debug("%s: call dspaces_put_sync()\n", __func__);
954
1061
            dspaces_put_sync();
 
1062
 
 
1063
            log_debug("%s: call dspaces_unlock_on_write(%s), rank=%d\n", __func__, md->fnames[i], mype);
 
1064
            dspaces_unlock_on_write(md->fnames[i], &mpi_comm);
955
1065
        }
956
 
        log_debug("%s: call dspaces_unlock_on_write(%s), rank=%d\n", __func__, p->fnames[i], mype);
957
 
        dspaces_unlock_on_write(p->fnames[i], &p->mpi_comm);
958
 
        free (p->fnames[i]);
 
1066
        free(md->fnames[i]);
959
1067
    }
960
1068
 
 
1069
    if (check_read_status == 2) {
 
1070
        // Note: dspaces_lock_on_write() above is only called by single process (whose md->rank == 0). MPI_Barrier ensures all writer processes to wait until reader application fetches data of last version. 
 
1071
        MPI_Barrier(md->mpi_comm_init);
 
1072
    }
961
1073
    // Free all previsouly allocated RDMA buffers
962
1074
    dimes_put_sync_all();
963
1075
 
965
1077
    if (globals_adios_is_dimes_connected_from_writer() && 
966
1078
            !globals_adios_is_dimes_connected_from_both())
967
1079
    {
968
 
        log_debug ("%s: call dspaces_barrier(), rank=%d\n", __func__,mype);
969
 
        dspaces_barrier();
970
 
        log_debug ("%s: call dspaces_finalize(), rank=%d\n", __func__,mype);
 
1080
        log_debug ("%s: call MPI Barrier on all connected processes(), rank=%d\n", __func__,mype);
 
1081
        MPI_Barrier (md->mpi_comm_init); 
 
1082
        log_debug ("%s: call dspaces_finalize(), rank=%d\n", __func__, mype);
971
1083
        dspaces_finalize();
972
1084
    }
973
1085
    globals_adios_set_dimes_disconnected_from_writer();