~ubuntu-branches/ubuntu/utopic/adios/utopic

« back to all changes in this revision

Viewing changes to src/write/adios_dataspaces.c

  • Committer: Package Import Robot
  • Author(s): Alastair McKinstry
  • Date: 2013-12-09 15:21:31 UTC
  • mfrom: (1.1.2)
  • Revision ID: package-import@ubuntu.com-20131209152131-jtd4fpmdv3xnunnm
Tags: 1.5.0-1
* New upstream.
* Standards-Version: 3.9.5
* Include latest config.{sub,guess} 
* New watch file.
* Create libadios-bin for binaries.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#include <unistd.h>
 
2
#include <fcntl.h>
 
3
#include <stdlib.h>
 
4
#include <math.h>
 
5
#include <string.h>
 
6
 
 
7
// xml parser
 
8
#include <mxml.h>
 
9
 
 
10
// for dataspaces ???
 
11
#include <sys/uio.h>
 
12
 
 
13
// see if we have MPI or other tools
 
14
#include "config.h"
 
15
 
 
16
#include "public/adios.h"
 
17
#include "public/adios_types.h"
 
18
#include "core/adios_transport_hooks.h"
 
19
#include "core/adios_internals.h"
 
20
#include "core/adios_internals_mxml.h"
 
21
#include "core/util.h"
 
22
#include "core/ds_metadata.h"
 
23
#include "core/adios_logger.h"
 
24
 
 
25
#include "dataspaces.h"
 
26
 
 
27
/*#define DATASPACES_NO_VERSIONING  define it at configure as -DDATASPACES_NO_VERSIONING in CFLAGS */
 
28
 
 
29
static int adios_dataspaces_initialized = 0;
 
30
#define MAX_DS_NAMELEN 128
 
31
#define MAX_NUM_OF_FILES 20
 
32
//static char ds_type_var_name[MAX_DS_NAMELEN];
 
33
static char ds_var_name[MAX_DS_NAMELEN];
 
34
static unsigned int adios_dataspaces_verbose = 3;
 
35
 
 
36
struct adios_ds_data_struct
 
37
{
 
38
    int rank;   // dataspaces rank or MPI rank if MPI is available
 
39
    int peers;  // from xml parameter or group communicator
 
40
    int appid;  // from xml parameter or 1
 
41
    int time_index; // versioning in DataSpaces, start from 0
 
42
    int n_writes; // how many times adios_write has been called
 
43
#if HAVE_MPI
 
44
    MPI_Comm mpi_comm;
 
45
#endif
 
46
    int  num_of_files; // how many files do we have with this method
 
47
    char *fnames[MAX_NUM_OF_FILES];  // names of files (needed at finalize)
 
48
    int  fversions[MAX_NUM_OF_FILES];   // last steps of files (needed at finalize)
 
49
};
 
50
 
 
51
 
 
52
 
 
53
static int get_dim_rank_value(struct adios_dimension_item_struct * dim_info, struct adios_group_struct *group)
 
54
{
 
55
    if(!dim_info)
 
56
       return 0;
 
57
 
 
58
    if(dim_info->id != 0)
 
59
    {
 
60
        struct adios_var_struct *dim_var = NULL;
 
61
        dim_var = adios_find_var_by_id(group->vars, dim_info->id);
 
62
        if(!dim_var || !dim_var->data)
 
63
           return 0;
 
64
        
 
65
        int rank = 0;
 
66
        switch ( dim_var->type )
 
67
        {
 
68
        case adios_unsigned_byte:
 
69
             rank = (*(uint8_t*) dim_var->data);
 
70
             break;
 
71
        case adios_byte:
 
72
             rank = (*(int8_t*) dim_var->data);
 
73
             break;
 
74
        case adios_unsigned_short:
 
75
             rank = (*(uint16_t*) dim_var->data);
 
76
             break;
 
77
        case adios_short:
 
78
             rank = (*(int16_t*) dim_var->data);
 
79
             break;
 
80
        case adios_unsigned_integer:
 
81
             rank = (*(uint32_t*) dim_var->data);
 
82
             break;
 
83
        case adios_integer:
 
84
             rank = (*(int32_t*) dim_var->data);
 
85
             break;
 
86
        case adios_unsigned_long:
 
87
             rank = (*(uint64_t*) dim_var->data);
 
88
             break;
 
89
        case adios_long:
 
90
             rank = (*(int64_t*) dim_var->data);
 
91
             break;
 
92
 
 
93
        default: break;
 
94
        }
 
95
 
 
96
        return rank;
 
97
    }
 
98
    else
 
99
    {
 
100
        return dim_info->rank;
 
101
    }
 
102
}
 
103
 
 
104
 
 
105
static int connect_to_dspaces (struct adios_ds_data_struct * p, MPI_Comm comm)
 
106
{
 
107
    int ret = 0;
 
108
    int num_peers;
 
109
 
 
110
    if (!globals_adios_is_dataspaces_connected()) {
 
111
 
 
112
        MPI_Comm_rank (comm, &(p->rank));
 
113
        MPI_Comm_size (comm, &num_peers);
 
114
 
 
115
        // Application ID should be set by the application calling adios_set_application_id()
 
116
        int was_set;
 
117
        p->appid = globals_adios_get_application_id (&was_set);
 
118
        if (!was_set)
 
119
            p->appid = 1;
 
120
 
 
121
        log_debug ("adios_dataspaces: rank=%d connect to DATASPACES, peers=%d, appid=%d \n",
 
122
                p->rank, num_peers, p->appid);
 
123
 
 
124
        //Init the dart client
 
125
        ret = dspaces_init (num_peers, p->appid);
 
126
        if (ret) {
 
127
            log_error ("adios_dataspaces: rank=%d Failed to connect to DATASPACES: err=%d,  rank=%d\n", p->rank, ret);        
 
128
            return ret;
 
129
        }
 
130
 
 
131
#if ! HAVE_MPI
 
132
        dspaces_rank (&(p->rank));
 
133
        dspaces_peers (&(p->peers));
 
134
#endif
 
135
 
 
136
        log_debug ("adios_dataspaces: rank=%d connected to DATASPACES: peers=%d\n", p->rank, p->peers);        
 
137
        globals_adios_set_dataspaces_connected_from_writer();
 
138
    }
 
139
    return ret;
 
140
}
 
141
 
 
142
 
 
143
void adios_dataspaces_init (const PairStruct * parameters,
 
144
                     struct adios_method_struct * method
 
145
                     )
 
146
{
 
147
    struct adios_ds_data_struct *p = 0;
 
148
    if (!adios_dataspaces_initialized)
 
149
    {
 
150
        adios_dataspaces_initialized = 1;
 
151
    }
 
152
   
 
153
    method->method_data = calloc (1, sizeof (struct adios_ds_data_struct));
 
154
    p = (struct adios_ds_data_struct*)method->method_data;
 
155
    
 
156
    int index, i;
 
157
    char temp[64];
 
158
 
 
159
    //Init the static data structure
 
160
    p->peers = 1;
 
161
    p->appid = -1;
 
162
    p->time_index = 0;
 
163
    p->n_writes = 0;
 
164
#if HAVE_MPI
 
165
    p->mpi_comm = MPI_COMM_NULL;
 
166
#endif
 
167
    p->num_of_files = 0;
 
168
 
 
169
    connect_to_dspaces (p, method->init_comm);
 
170
 
 
171
    log_info ("adios_dataspaces_init: done\n");
 
172
   
 
173
}
 
174
 
 
175
 
 
176
 
 
177
int adios_dataspaces_open (struct adios_file_struct * fd,
 
178
                    struct adios_method_struct * method,
 
179
                    MPI_Comm comm
 
180
                    )
 
181
{
 
182
    int ret = 0;
 
183
    struct adios_ds_data_struct *p = (struct adios_ds_data_struct *)
 
184
                                                method->method_data;
 
185
  
 
186
    log_info ("adios_dataspaces_open: open %s, mode=%d, time_index=%d \n",
 
187
                        fd->name, fd->mode, p->time_index);
 
188
 
 
189
#if HAVE_MPI
 
190
    // if we have MPI and a communicator, we can get the exact size of this application
 
191
    // that we need to tell DATASPACES
 
192
    p->mpi_comm = comm;
 
193
    MPI_Comm_rank (p->mpi_comm, &(p->rank));
 
194
    MPI_Comm_size (p->mpi_comm, &(p->peers));
 
195
#endif
 
196
 
 
197
    // connect to DATASPACES at the very first adios_open(), disconnect in adios_finalize()
 
198
    // connect only if the READ API has not connected yet
 
199
    /*
 
200
    ret = connect_to_dspaces (p, p->mpi_comm);
 
201
    if (ret)
 
202
        return ret;
 
203
    */
 
204
 
 
205
    if (fd->mode == adios_mode_write || fd->mode == adios_mode_append)
 
206
    {
 
207
        log_debug ("adios_dataspaces_open: rank=%d call write lock...\n", p->rank);        
 
208
        dspaces_lock_on_write (fd->name, &p->mpi_comm);  
 
209
        log_debug ("adios_dataspaces_open: rank=%d got write lock\n", p->rank);        
 
210
    }
 
211
    else if (fd->mode == adios_mode_read)
 
212
    {
 
213
        dspaces_lock_on_read (fd->name, &p->mpi_comm);
 
214
    } 
 
215
  
 
216
    return ret;
 
217
}
 
218
 
 
219
enum ADIOS_FLAG adios_dataspaces_should_buffer (struct adios_file_struct * fd
 
220
                                         ,struct adios_method_struct * method
 
221
                                         )
 
222
{
 
223
    
 
224
    //if (fd->shared_buffer == adios_flag_no && fd->mode != adios_mode_read)
 
225
    //{
 
226
        // write the process group header
 
227
        //adios_write_process_group_header_v1 (fd, fd->write_size_bytes);
 
228
        //adios_write_open_vars_v1 (fd);
 
229
    //} else {
 
230
    //    log_warn("WARNING: %s expects that fd->shared_buffer is false\n", __func__);
 
231
    //}
 
232
    
 
233
 
 
234
    return adios_flag_no;  // this will take care of it
 
235
}
 
236
 
 
237
 
 
238
void adios_dataspaces_write (struct adios_file_struct * fd
 
239
                      ,struct adios_var_struct * v
 
240
                      ,void * data
 
241
                      ,struct adios_method_struct * method
 
242
                      )
 
243
{
 
244
    struct adios_ds_data_struct *p = (struct adios_ds_data_struct *)
 
245
                                                            method->method_data;
 
246
    struct adios_group_struct *group = fd->group;
 
247
    //Get var size
 
248
    //  FIXME: type size of a string >2GB does not fit to int. 
 
249
    //  adios_get_type_size returns uint64_t but dspaces_put handles only int
 
250
    //  as element size
 
251
    int var_type_size = (int) adios_get_type_size(v->type, v->data);
 
252
    //Get var name
 
253
    char * var_name = v->name;
 
254
    int err;
 
255
 
 
256
    //Get two offset coordinate values
 
257
    unsigned int version;
 
258
 
 
259
    int dims[3]={1,1,1}, gdims[3]={0,0,0}, lb[3]={0,0,0}, ub[3]={0,0,0}; /* lower and upper bounds for DataSpaces */
 
260
    int didx[3]; // for reordering the dimensions
 
261
    int ndims = 0;
 
262
    int hastime = 0;
 
263
    struct adios_dimension_struct* var_dimensions = v->dimensions;
 
264
    // Calculate lower and upper bounds for each available dimension (up to 3 dims)
 
265
    while( var_dimensions && ndims < 3)
 
266
    {
 
267
        dims[ndims] = get_dim_rank_value(&(var_dimensions->dimension), group);
 
268
        gdims[ndims] = get_dim_rank_value(&(var_dimensions->global_dimension), group);
 
269
        lb[ndims] = get_dim_rank_value(&(var_dimensions->local_offset), group);
 
270
        if (dims[ndims] > 0)  {
 
271
            ub[ndims] = lb[ndims] + dims[ndims] - 1;
 
272
            ndims++;
 
273
        }   else {
 
274
            // time dimension (ldim=0 indicates this). Leave out from the dimensions.
 
275
            //ub[ndims] = lb[ndims]; 
 
276
            hastime = 1;
 
277
        }
 
278
        var_dimensions = var_dimensions->next;
 
279
    }
 
280
 
 
281
#ifdef DATASPACES_NO_VERSIONING
 
282
    version = 0;              /* Update/overwrite data in DataSpaces  (we write time_index as a variable at close)*/
 
283
#else
 
284
    version = p->time_index;  /* Add new data as separate to DataSpaces */
 
285
#endif
 
286
    
 
287
    if (v->path != NULL && v->path[0] != '\0' && strcmp(v->path,"/")) 
 
288
        snprintf(ds_var_name, MAX_DS_NAMELEN, "%s/%s/%s/%s", fd->name, fd->group->name, v->path, v->name);
 
289
    else 
 
290
        snprintf(ds_var_name, MAX_DS_NAMELEN, "%s/%s//%s", fd->name, fd->group->name, v->name);
 
291
 
 
292
    //snprintf(dspaces_type_var_name, MAX_DS_NAMELEN, "TYPE@%s", ds_var_name);
 
293
    
 
294
    /* non-global variables are put in space ONLY by rank = 0 process */
 
295
    if (gdims[0] == 0 && p->rank != 0) {
 
296
        //fprintf(stderr, "rank=%d var_name=%s is not global. Skip\n", p->rank, ds_var_name);
 
297
        return;
 
298
    }
 
299
 
 
300
    
 
301
    //if (fd->shared_buffer == adios_flag_no)
 
302
    //{
 
303
        // var payload sent for sizing information
 
304
        //adios_write_var_header_v1 (fd, v);
 
305
    //}
 
306
    
 
307
     
 
308
    v->write_offset = 1; // only !=0 offsets will be included in build index
 
309
    adios_generate_var_characteristics_v1 (fd, v); // characteristics will be included in build index
 
310
    adios_write_var_characteristics_v1 (fd, v);
 
311
    
 
312
 
 
313
    log_debug ("var_name=%s, type=%s(%d) elemsize=%d, version=%d, ndims=%d, size=(%d,%d,%d), gdim=(%d,%d,%d), lb=(%d,%d,%d), ub=(%d,%d,%d)\n",
 
314
            ds_var_name, adios_type_to_string_int(v->type), v->type, var_type_size, version, ndims,
 
315
            dims[0], dims[1], dims[2], gdims[0], gdims[1], gdims[2], lb[0], lb[1], lb[2], ub[0], ub[1], ub[2]);
 
316
 
 
317
    /* non-timed scalars are written in the metadata at close(), not here */
 
318
    if (ndims == 0 && !hastime)
 
319
        return;
 
320
 
 
321
    /* Put type info as T<varname>, integer in 0,0,0,0,0,0 position */
 
322
    //err = dspaces_put(dspaces_type_var_name, version, 4, 0,0,0,0,0,0, &(v->type)); 
 
323
 
 
324
    ds_dimension_ordering(ndims,
 
325
            group->adios_host_language_fortran == adios_flag_yes, 
 
326
            0 /*pack*/, didx);
 
327
 
 
328
    dspaces_put(ds_var_name, version, var_type_size, 
 
329
             lb[didx[0]], lb[didx[1]], lb[didx[2]], 
 
330
             ub[didx[0]], ub[didx[1]], ub[didx[2]], 
 
331
             data);
 
332
    
 
333
    log_debug ("var_name=%s, dimension ordering=(%d,%d,%d), gdims=(%d,%d,%d), lb=(%d,%d,%d), ub=(%d,%d,%d)\n",
 
334
            ds_var_name, 
 
335
            didx[0], didx[1], didx[2], 
 
336
            gdims[didx[0]], gdims[didx[1]], gdims[didx[2]], 
 
337
            lb[didx[0]], lb[didx[1]], lb[didx[2]], 
 
338
            ub[didx[0]], ub[didx[1]], ub[didx[2]]);
 
339
    dspaces_put_sync();
 
340
}
 
341
 
 
342
void adios_dataspaces_get_write_buffer (struct adios_file_struct * fd
 
343
                                 ,struct adios_var_struct * v
 
344
                                 ,uint64_t * size
 
345
                                 ,void ** buffer
 
346
                                 ,struct adios_method_struct * method
 
347
                                 )
 
348
{
 
349
    uint64_t mem_allowed;
 
350
 
 
351
    if (*size == 0)
 
352
    {
 
353
        *buffer = 0;
 
354
 
 
355
        return;
 
356
    }
 
357
 
 
358
    if (v->data && v->free_data == adios_flag_yes)
 
359
    {
 
360
        adios_method_buffer_free (v->data_size);
 
361
        free (v->data);
 
362
        v->data = NULL;
 
363
    }
 
364
 
 
365
    mem_allowed = adios_method_buffer_alloc (*size);
 
366
    if (mem_allowed == *size)
 
367
    {
 
368
        *buffer = malloc (*size);
 
369
        if (!*buffer)
 
370
        {
 
371
            adios_method_buffer_free (mem_allowed);
 
372
            log_error ("ERROR: Out of memory allocating %llu bytes for %s in %s:%s()\n"
 
373
                    ,*size, v->name, __FILE__, __func__
 
374
                    );
 
375
            v->got_buffer = adios_flag_no;
 
376
            v->free_data = adios_flag_no;
 
377
            v->data_size = 0;
 
378
            v->data = 0;
 
379
            *size = 0;
 
380
            *buffer = 0;
 
381
        }
 
382
        else
 
383
        {
 
384
            v->got_buffer = adios_flag_yes;
 
385
            v->free_data = adios_flag_yes;
 
386
            v->data_size = mem_allowed;
 
387
            v->data = *buffer;
 
388
        }
 
389
    }
 
390
    else
 
391
    {
 
392
        adios_method_buffer_free (mem_allowed);
 
393
        log_error ("OVERFLOW: Cannot allocate requested buffer of %llu "
 
394
                         "bytes for %s in %s:%s()\n"
 
395
                ,*size
 
396
                ,v->name
 
397
                ,__FILE__, __func__
 
398
                );
 
399
        *size = 0;
 
400
        *buffer = 0;
 
401
    }
 
402
}
 
403
 
 
404
/* NOT IMPLEMENTED. Use the Read API to read variables */
 
405
void adios_dataspaces_read (struct adios_file_struct * fd
 
406
                     ,struct adios_var_struct * v, void * buffer
 
407
                     ,uint64_t buffer_size
 
408
                     ,struct adios_method_struct * method
 
409
                     )
 
410
{
 
411
    struct adios_ds_data_struct *p = (struct adios_ds_data_struct *)
 
412
                                                            method->method_data;
 
413
    uint64_t var_type_size = adios_get_type_size(v->type, v->data);
 
414
 
 
415
    //Get var name
 
416
    char * var_name = v->name;
 
417
 
 
418
    //Get two offset coordinate values
 
419
    int version, offset1[3],offset2[3];
 
420
    int dim_size[3];
 
421
    memset(offset1, 0, 3*sizeof(int));
 
422
    memset(offset2, 0, 3*sizeof(int));
 
423
    memset(dim_size, 0, 3*sizeof(int));
 
424
 
 
425
    version = p->time_index;
 
426
 
 
427
    //dspaces_lock_on_read_();
 
428
 
 
429
    //dspaces_get
 
430
 
 
431
    //dspaces_unlock_on_read_();
 
432
}
 
433
 
 
434
/* Gather var/attr indices from all processes to rank 0 */
 
435
static void adios_dataspaces_gather_indices (struct adios_file_struct * fd
 
436
                               ,struct adios_method_struct * method
 
437
                               ,struct adios_index_process_group_struct_v1 **pg_root 
 
438
                               ,struct adios_index_var_struct_v1 **vars_root
 
439
                               ,struct adios_index_attribute_struct_v1 ** attrs_root
 
440
                               )
 
441
{
 
442
    struct adios_ds_data_struct *p = (struct adios_ds_data_struct *)
 
443
                                                method->method_data;
 
444
    struct adios_index_process_group_struct_v1 * my_pg_root = 0;
 
445
    struct adios_index_var_struct_v1 * my_vars_root = 0;
 
446
    struct adios_index_attribute_struct_v1 * my_attrs_root = 0;
 
447
    struct adios_index_process_group_struct_v1 * new_pg_root = 0;
 
448
    struct adios_index_var_struct_v1 * new_vars_root = 0;
 
449
    struct adios_index_attribute_struct_v1 * new_attrs_root = 0;
 
450
    
 
451
    // build local index first appending to any existing index
 
452
    adios_build_index_v1 (fd, &my_pg_root, &my_vars_root, &my_attrs_root);
 
453
 
 
454
    log_debug ("%s index after first build is pg=%x vars=%x attrs=%x\n", 
 
455
                __func__, my_pg_root, my_vars_root, my_attrs_root);
 
456
#if 0
 
457
#if HAVE_MPI
 
458
    // gather all on rank 0
 
459
    if (p->mpi_comm != MPI_COMM_NULL)
 
460
    {                                
 
461
        if (p->rank == 0)           
 
462
        {                            
 
463
            int * index_sizes = malloc (4 * p->peers);
 
464
            int * index_offsets = malloc (4 * p->peers);
 
465
            char * recv_buffer = 0;
 
466
            uint32_t size = 0;
 
467
            uint32_t total_size = 0;
 
468
            int i;
 
469
            struct adios_bp_buffer_struct_v1 b;
 
470
 
 
471
            MPI_Gather (&size, 1, MPI_INT
 
472
                    ,index_sizes, 1, MPI_INT
 
473
                    ,0, p->mpi_comm
 
474
                    );
 
475
 
 
476
            for (i = 0; i < p->peers; i++)
 
477
            {
 
478
                index_offsets [i] = total_size;
 
479
                total_size += index_sizes [i];
 
480
            }                    
 
481
 
 
482
            recv_buffer = malloc (total_size);
 
483
 
 
484
            MPI_Gatherv (&size, 0, MPI_BYTE
 
485
                    ,recv_buffer, index_sizes, index_offsets
 
486
                    ,MPI_BYTE, 0, p->mpi_comm
 
487
                    );
 
488
 
 
489
            for (i = 1; i < p->peers; i++)
 
490
            {
 
491
                b.buff = recv_buffer + index_offsets [i];
 
492
                b.length = index_sizes [i];
 
493
                b.offset = 0;
 
494
 
 
495
                adios_parse_process_group_index_v1 (&b
 
496
                        ,&new_pg_root
 
497
                        );
 
498
                adios_parse_vars_index_v1 (&b, &new_vars_root);
 
499
                adios_parse_attributes_index_v1 (&b
 
500
                        ,&new_attrs_root
 
501
                        );
 
502
                adios_merge_index_v1 (&my_pg_root
 
503
                        ,&my_vars_root
 
504
                        ,&my_attrs_root
 
505
                        ,new_pg_root, new_vars_root
 
506
                        ,new_attrs_root
 
507
                        );
 
508
                adios_clear_index_v1 (new_pg_root, new_vars_root, new_attrs_root);
 
509
                new_pg_root = 0;
 
510
                new_vars_root = 0;
 
511
                new_attrs_root = 0;
 
512
            }
 
513
 
 
514
            free (recv_buffer);
 
515
            free (index_sizes);
 
516
            free (index_offsets);
 
517
        }
 
518
        else
 
519
        {
 
520
            char * buffer = 0; 
 
521
            uint64_t buffer_size = 0;
 
522
            uint64_t buffer_offset = 0;
 
523
 
 
524
            adios_write_index_v1 (&buffer, &buffer_size, &buffer_offset
 
525
                    ,0, my_pg_root ,my_vars_root ,my_attrs_root);
 
526
 
 
527
            uint32_t tmp_buffer_size = (uint32_t) buffer_size;
 
528
            MPI_Gather (&tmp_buffer_size, 1, MPI_INT, 0, 0, MPI_INT
 
529
                    ,0, p->mpi_comm
 
530
                    );
 
531
            MPI_Gatherv (buffer, buffer_size, MPI_BYTE
 
532
                    ,0, 0, 0, MPI_BYTE
 
533
                    ,0, p->mpi_comm
 
534
                    );
 
535
            free (buffer);
 
536
        }
 
537
    }
 
538
 
 
539
#endif
 
540
#endif
 
541
 
 
542
    *pg_root = my_pg_root;
 
543
    *vars_root = my_vars_root;
 
544
    *attrs_root = my_attrs_root;
 
545
    log_debug ("%s index after gathering is pg=%x vars=%x attrs=%x\n", 
 
546
                __func__, my_pg_root, my_vars_root, my_attrs_root);
 
547
}
 
548
 
 
549
static int ds_get_full_name_len (char * path, char * name)
 
550
{
 
551
    int len;
 
552
    // make full name
 
553
    if (!path || !path[0] || !strcmp (path, "/")) { 
 
554
        // no path, just name + leading /
 
555
        len = strlen(name) + 1;
 
556
    } else {
 
557
        len = strlen(path) + strlen(name) + 1;
 
558
    }
 
559
    return len;
 
560
}
 
561
 
 
562
static int ds_get_full_name (char * path, char * name, int maxlen,
 
563
                            /*OUT*/char * out)
 
564
{
 
565
    int len;
 
566
    // make full name
 
567
    if (!path || !path[0] || !strcmp (path, "/")) { 
 
568
        // no path, just name + leading /
 
569
        len = strlen(name) + 1;
 
570
        out[0] = '/';
 
571
        strncpy(out+1, name, maxlen-1);
 
572
    } else {
 
573
        len = strlen(path);
 
574
        strncpy(out, path, maxlen-1);  // path +
 
575
        out[len] = '/';                         //   /  +
 
576
        strncpy(out+len+1, name, maxlen-len-1); // name
 
577
        len += strlen(name) + 1;
 
578
    }
 
579
    return len;
 
580
}
 
581
 
 
582
void ds_pack_group_info (struct adios_file_struct *fd
 
583
                                  ,struct adios_method_struct * method
 
584
                                  ,struct adios_index_var_struct_v1 *vars_root
 
585
                                  ,struct adios_index_attribute_struct_v1 * attrs_root
 
586
                                  ,char ** buffer, int *buffer_size, int *nvars, int *nattrs
 
587
                                  )
 
588
{
 
589
    struct adios_ds_data_struct *p = (struct adios_ds_data_struct *)
 
590
                                                method->method_data;
 
591
    struct adios_index_var_struct_v1 * v = vars_root;
 
592
    struct adios_index_attribute_struct_v1 * a = attrs_root;
 
593
    int size;
 
594
    int ndims; // whatever the type of v->characteristics->dims.count is, we write an int to buffer
 
595
    int hastime; // true if variable has time dimension
 
596
    uint64_t ldims[10], gdims[10]; // we can write only 3 dimensions, will drop time dim
 
597
    *nvars = 0;
 
598
    *nattrs = 0;
 
599
    int didx[3]; // dimension ordering indices
 
600
 
 
601
    log_debug ("%s entered\n", __func__);
 
602
 
 
603
    /* First cycle: count the size of info to allocate index buffer */
 
604
    size = 3*sizeof(int); //header for buffer: length, nvars, nattrs
 
605
    while (v) {
 
606
        size += 4*sizeof(int) // name len, type, hastime, number of dims 
 
607
                + ds_get_full_name_len (v->var_path, v->var_name) // full path
 
608
                + 3 * 8; // always write 3 dimensions in the index (even for scalars)
 
609
        if (v->characteristics->dims.count == 0) {
 
610
            // For scalars, we write the value into the index
 
611
            if (v->type != adios_string)
 
612
                size += adios_get_type_size(v->type, NULL);
 
613
            else
 
614
                size += adios_get_type_size(v->type, v->characteristics->value) + sizeof(int);
 
615
        }
 
616
        log_debug (" var %s/%s, size = %d\n", v->var_path, v->var_name, size);
 
617
        (*nvars)++;
 
618
        v = v->next;
 
619
    }
 
620
 
 
621
    while (a) {
 
622
        size += sizeof(int) 
 
623
                + ds_get_full_name_len (a->attr_path, a->attr_name)
 
624
                + sizeof(int); // type
 
625
        if (a->type != adios_string)
 
626
            size += adios_get_type_size(a->type, NULL);
 
627
        else
 
628
            size += adios_get_type_size(a->type, a->characteristics->value) + sizeof(int);
 
629
 
 
630
        log_debug (" attr %s/%s, size = %d\n", a->attr_path, a->attr_name, size);
 
631
        (*nattrs)++;
 
632
        a = a->next;
 
633
    }
 
634
 
 
635
    // Required for Cray Gemini: align buffer to 8 bytes boundaries
 
636
    int align_bytes = 0; // number of extra bytes at the end
 
637
    if (size % 8) {
 
638
        align_bytes = 8 - (size % 8);
 
639
        size += align_bytes;
 
640
        log_debug (" after alignment, size = %d, align_bytes = %d\n", size, align_bytes);
 
641
    }
 
642
 
 
643
    *buffer = (char *) malloc (size);
 
644
    *buffer_size = size;
 
645
 
 
646
    /* Second cycle: fill up the buffer */
 
647
    v = vars_root;
 
648
    a = attrs_root;
 
649
    char * b = *buffer;
 
650
    int i, j, namelen;
 
651
    char name[256];
 
652
 
 
653
    //header for buffer: length, nvars, nattrs
 
654
    memcpy (b, buffer_size, sizeof(int));  
 
655
    b += sizeof(int); 
 
656
    memcpy (b, nvars, sizeof(int));  
 
657
    b += sizeof(int); 
 
658
    memcpy (b, nattrs, sizeof(int));  
 
659
    b += sizeof(int); 
 
660
    while (v) {
 
661
        namelen = ds_get_full_name (v->var_path, v->var_name, sizeof(name), name);
 
662
        memcpy (b, &namelen, sizeof(int));  // length of full path
 
663
        b += sizeof(int); 
 
664
        memcpy (b, name, namelen);          // full path
 
665
        b += namelen;
 
666
        memcpy (b, &(v->type), sizeof(int)); // type 
 
667
        b += sizeof(int); 
 
668
        //ndims = MAX(v->characteristics->dims.count,3); // convert whatever type to int
 
669
        //memcpy (b, &(v->characteristics->dims.count), sizeof(int)); // number of dimensions
 
670
        log_debug("Variable %s, total dims = %d\n", name, v->characteristics->dims.count);
 
671
        j = 0; // we can write only 3 dims, will drop the time dimension
 
672
        hastime = 0;
 
673
        for (i = 0; i<v->characteristics->dims.count; i++) {
 
674
            ldims[j] = v->characteristics->dims.dims[j*3];  // ith dimension 
 
675
            gdims[j] = v->characteristics->dims.dims[j*3+1];  // ith dimension 
 
676
            log_debug("           , ldim = %lld gdim = %lld)\n", ldims[j], gdims[j]);
 
677
            if (gdims[j] == 0 && ldims[j] == 1) {
 
678
                // time dimension's global=0, local=1, skip
 
679
                // FIXME: This is true for a local array of length 1 (not defined as global)
 
680
                log_debug("               skip this dimension )\n");
 
681
                hastime = 1;
 
682
                continue;
 
683
            }
 
684
            j++;
 
685
        }
 
686
        for (i=j; i<3; i++) {
 
687
            // fill up dimensions up to 3rd dim
 
688
            ldims[i] = 1;
 
689
            gdims[i] = 1;
 
690
        }
 
691
        ndims = (j < 3 ? j : 3); // we can have max 3 dimensions in DataSpaces
 
692
        memcpy (b, &hastime, sizeof(int)); // has time dimension?
 
693
        log_debug("             has time = %d (%d)\n", hastime, *(int*)b);
 
694
        b += sizeof(int); 
 
695
        memcpy (b, &ndims, sizeof(int)); // number of dimensions
 
696
        log_debug("             ndims = %d (%d)\n", ndims, *(int*)b);
 
697
        b += sizeof(int); 
 
698
        ds_dimension_ordering(ndims, 
 
699
                fd->group->adios_host_language_fortran == adios_flag_yes, 
 
700
                0 /*pack*/, didx);
 
701
        for (i = 0; i < 3; i++) {
 
702
            if (gdims[didx[i]]) { 
 
703
                // global variable
 
704
                memcpy (b, &(gdims[didx[i]]), 8);  // ith dimension 
 
705
            } else { 
 
706
                // a local variable has no global dimensions
 
707
                // in space, its local dimensions become the global dimensions
 
708
                memcpy (b, &(ldims[didx[i]]), 8);  // ith dimension 
 
709
            }
 
710
            b += 8; 
 
711
        }
 
712
        if (v->characteristics->dims.count == 0) {
 
713
            // NOTE: ndims = 0 can mean a timed scalar, which has no characteristics->value!
 
714
            // store scalar value too
 
715
            if (v->type != adios_string) {
 
716
                size = adios_get_type_size(v->type, NULL);
 
717
                memcpy (b, v->characteristics->value, size); 
 
718
                b += size; 
 
719
            } else {
 
720
                size = adios_get_type_size(v->type, v->characteristics->value);
 
721
                memcpy (b, &size, sizeof(int)); 
 
722
                b += sizeof(int); 
 
723
                memcpy (b, v->characteristics->value, size); 
 
724
                b += size; 
 
725
            }
 
726
        }
 
727
        v = v->next;
 
728
    }
 
729
 
 
730
    while (a) {
 
731
        namelen = ds_get_full_name (a->attr_path, a->attr_name, sizeof(name), name);
 
732
        memcpy (b, &namelen, sizeof(int));  // length of full path
 
733
        b += sizeof(int); 
 
734
        memcpy (b, name, namelen);          // full path
 
735
        b += namelen;
 
736
        memcpy (b, &(a->type), sizeof(int)); // type 
 
737
        b += sizeof(int); 
 
738
        // store scalar value too
 
739
        if (a->type != adios_string) {
 
740
            size = adios_get_type_size(a->type, NULL);
 
741
            memcpy (b, a->characteristics->value, size); 
 
742
            b += size; 
 
743
        } else {
 
744
            size = adios_get_type_size(a->type, a->characteristics->value);
 
745
            memcpy (b, &size, sizeof(int)); 
 
746
            b += sizeof(int); 
 
747
            memcpy (b, a->characteristics->value, size); 
 
748
            b += size; 
 
749
        }
 
750
        a = a->next;
 
751
    }
 
752
 
 
753
    // alignment
 
754
    if (align_bytes) {
 
755
        uint64_t zero = 0;
 
756
        memcpy (b, &zero, align_bytes); 
 
757
        b += align_bytes;
 
758
    }
 
759
 
 
760
    // sanity check
 
761
    if ( (int)(b-*buffer) > *buffer_size) {
 
762
        log_error ("ERROR in %s. Calculated group index buffer size as %d, but filled after that with %d bytes\n",
 
763
            __func__, *buffer_size, (int)(b-*buffer));
 
764
    }
 
765
    // written buffer might be shorter than calculated since we skip time dimensions.
 
766
    // set the correct size now
 
767
    *buffer_size = (int)(b-*buffer);
 
768
    memcpy (*buffer, buffer_size, sizeof(int));  
 
769
 
 
770
    
 
771
    log_debug("   %s: buffer length = %d, content:\n", __func__, *buffer_size);
 
772
    b = *buffer;
 
773
    for (i=0; i<*buffer_size; i+=16) {
 
774
        for (j=0; j<4; j++) {
 
775
            log_debug_cont ("%3.3hhu %3.3hhu %3.3hhu %3.3hhu    ", 
 
776
                            b[i+4*j], b[i+4*j+1], b[i+4*j+2], b[i+4*j+3]);
 
777
        }
 
778
        log_debug_cont("\n");
 
779
    }
 
780
    
 
781
 
 
782
    log_debug ("%s exit\n", __func__);
 
783
}
 
784
 
 
785
/* FIXME: put this function into ds_metadata.c */
 
786
/* buff is allocated and must be freed after use */
 
787
void ds_pack_file_info (int time, int nvars, int nattrs, int group_index_len, char * groupname, 
 
788
                    /*OUT*/char **buf, /*OUT*/int *buf_len)
 
789
{
 
790
    *buf_len = 128;
 
791
    *buf = (char *) malloc (*buf_len);
 
792
 
 
793
    char *b = *buf;
 
794
    int namelen = strlen(groupname);
 
795
    memcpy (b, buf_len, sizeof(int));  /* 0-: length of this buffer */
 
796
    b += sizeof(int);
 
797
    memcpy (b, &time, sizeof(int));  /* 4-: time */
 
798
    b += sizeof(int); 
 
799
    memcpy (b, &nvars, sizeof(int));  /* 8-: number of variables */
 
800
    b += sizeof(int);
 
801
    memcpy (b, &nattrs, sizeof(int));  /* 12-: number of attributes */
 
802
    b += sizeof(int);
 
803
    memcpy (b, &group_index_len, sizeof(int));  /* 16-: length of group index*/
 
804
    b += sizeof(int);
 
805
    memcpy (b, &namelen, sizeof(int));  /* 20-: length of group name */
 
806
    b += sizeof(int);
 
807
    memcpy (b, groupname, namelen);  /* 24-: group name */
 
808
    b[namelen] = 0;
 
809
}
 
810
 
 
811
void adios_dataspaces_close (struct adios_file_struct * fd
 
812
                      ,struct adios_method_struct * method
 
813
                      )
 
814
{
 
815
    struct adios_ds_data_struct *p = (struct adios_ds_data_struct *)
 
816
                                                method->method_data;
 
817
    struct adios_index_process_group_struct_v1 * pg_root;
 
818
    struct adios_index_var_struct_v1 * vars_root;
 
819
    struct adios_index_attribute_struct_v1 * attrs_root;
 
820
    struct adios_attribute_struct * a = fd->group->attributes;
 
821
    int lb[3], ub[3], didx[3]; // for reordering DS dimensions
 
822
    unsigned int version;
 
823
 
 
824
    if (fd->mode == adios_mode_write || fd->mode == adios_mode_append)
 
825
    {
 
826
        // finalize variable info in fd buffer, next we call build_index
 
827
        while (a) {
 
828
            a->write_offset = 1; // only attributes with !=0 offset will be included in build index
 
829
            a=a->next;
 
830
        }
 
831
 
 
832
        //adios_write_close_vars_v1 (fd);
 
833
        /* Gather var/attr indices from all processes to rank 0 */
 
834
        adios_dataspaces_gather_indices (fd, method, &pg_root, &vars_root ,&attrs_root);
 
835
 
 
836
        // make sure all processes have finished putting data to the space 
 
837
        // before we put metadata from rank 0
 
838
        MPI_Barrier (p->mpi_comm); 
 
839
 
 
840
        if (p->rank == 0) {
 
841
 
 
842
            /* Write two adios specific variables with the name of the file and name of the group into the space */
 
843
            /* ADIOS Read API fopen() checks these variables to see if writing already happened */
 
844
#ifdef DATASPACES_NO_VERSIONING
 
845
            version = 0;              /* Update/overwrite data in DataSpaces */
 
846
#else
 
847
            version = p->time_index;  /* Add new data as separate to DataSpaces */
 
848
#endif
 
849
 
 
850
            /* Make metadata from indices */
 
851
            char * indexbuf;
 
852
            int    indexlen;
 
853
            int    nvars, nattrs;
 
854
            ds_pack_group_info (fd, method, vars_root, attrs_root, 
 
855
                                   &indexbuf, &indexlen, &nvars, &nattrs);
 
856
 
 
857
            
 
858
            /* Put GROUP@fn/gn header into space */
 
859
            snprintf(ds_var_name, MAX_DS_NAMELEN, "GROUP@%s/%s", fd->name, fd->group->name);
 
860
            log_debug ("%s: put %s with buf len %d into space\n", __func__, ds_var_name, indexlen);
 
861
            ub[0] = indexlen-1; ub[1] = 0; ub[2] = 0;
 
862
            ds_dimension_ordering(1, 0, 0, didx); // C ordering of 1D array into DS
 
863
            dspaces_put(ds_var_name, version, 1,    0, 0, 0, /* lb 0..2 */
 
864
                     ub[didx[0]], ub[didx[1]], ub[didx[2]],  indexbuf); 
 
865
            free (indexbuf);
 
866
 
 
867
            /* Create and put FILE@fn header into space */
 
868
            char * file_info_buf; /* store FILE@fn's group list */
 
869
            int    file_info_buf_len; /* = 128 currently */
 
870
            snprintf (ds_var_name, MAX_DS_NAMELEN, "FILE@%s", fd->name);
 
871
            ds_pack_file_info (p->time_index, nvars, nattrs, indexlen, fd->group->name, 
 
872
                               &file_info_buf, &file_info_buf_len);
 
873
            log_debug ("%s: put %s = buflen=%d time=%d nvars=%d nattr=%d index=%d name=%d:%s into space\n",
 
874
                __func__, ds_var_name, 
 
875
                *(int*)file_info_buf, *(int*)(file_info_buf+4), 
 
876
                *(int*)(file_info_buf+8), *(int*)(file_info_buf+12),
 
877
                *(int*)(file_info_buf+16), *(int*)(file_info_buf+20),
 
878
                file_info_buf+24);
 
879
            /* Flip 1st and 2nd dimension for DataSpaces representation for a 1D array*/
 
880
            ub[0] = file_info_buf_len-1; ub[1] = 0; ub[2] = 0;
 
881
            ds_dimension_ordering(1, 0, 0, didx); // C ordering of 1D array into DS
 
882
            dspaces_put_sync(); //wait on previous put to finish
 
883
            dspaces_put(ds_var_name, version, 1,    0, 0, 0, /* lb 0..2 */
 
884
                     ub[didx[0]], ub[didx[1]], ub[didx[2]], file_info_buf); 
 
885
 
 
886
            /* Create and put VERSION@fn version info into space */
 
887
            int version_buf[2] = {version, 0}; /* last version put in space; not terminated */
 
888
            int version_buf_len = 2; 
 
889
            snprintf (ds_var_name, MAX_DS_NAMELEN, "VERSION@%s", fd->name);
 
890
            log_debug ("%s: put %s with buf = [%d,%d] (len=%d integers) into space\n", 
 
891
                       __func__, ds_var_name, version_buf[0], version_buf[1], version_buf_len);
 
892
            ub[0] = version_buf_len-1; ub[1] = 0; ub[2] = 0;
 
893
            ds_dimension_ordering(1, 0, 0, didx); // C ordering of 1D array into DS
 
894
            dspaces_put_sync(); //wait on previous put to finish
 
895
            dspaces_put(ds_var_name, 0, sizeof(int),    0, 0, 0, /* lb 0..2 */
 
896
                     ub[didx[0]], ub[didx[1]], ub[didx[2]],  version_buf); 
 
897
            dspaces_put_sync(); //wait on previous put to finish
 
898
            
 
899
        }
 
900
 
 
901
        // remember this filename and its version for finalize
 
902
        int i;
 
903
        for (i=0; i<p->num_of_files; i++) {
 
904
            if (!strcmp(fd->name, p->fnames[i]))
 
905
                break;
 
906
        }
 
907
        if (i == p->num_of_files) {
 
908
            if (p->num_of_files < MAX_NUM_OF_FILES) {
 
909
                p->fnames[ p->num_of_files ] = strdup(fd->name);
 
910
                p->num_of_files++;
 
911
            } else {
 
912
                log_error ("%s: Max %d files can be written by one application "
 
913
                        "using the DATASPACES method\n",
 
914
                        __func__, MAX_NUM_OF_FILES);
 
915
            }
 
916
        }
 
917
        if (i < p->num_of_files) {
 
918
            p->fversions[i] = version;
 
919
        }
 
920
 
 
921
 
 
922
        // free allocated index lists
 
923
        adios_clear_index_v1 (pg_root, vars_root, attrs_root);
 
924
 
 
925
        // rank=0 may be in put_sync when others call unlock, which is a global op
 
926
        MPI_Barrier (p->mpi_comm); 
 
927
        //log_debug("%s: call dspaces_put_sync()\n", __func__);
 
928
        //dspaces_put_sync();
 
929
        log_debug("%s: call dspaces_unlock_on_write(%s)\n", __func__, fd->name);
 
930
        dspaces_unlock_on_write(fd->name, &p->mpi_comm);
 
931
    }
 
932
    else if( fd->mode == adios_mode_read )
 
933
    {
 
934
        dspaces_unlock_on_read(fd->name, &p->mpi_comm);
 
935
    } 
 
936
 
 
937
    /* Increment the time index */
 
938
    p->time_index++;
 
939
 
 
940
 
 
941
    log_info ("%s: exit\n", __func__);
 
942
}
 
943
 
 
944
void adios_dataspaces_finalize (int mype, struct adios_method_struct * method)
 
945
{
 
946
    struct adios_ds_data_struct *p = (struct adios_ds_data_struct *)
 
947
        method->method_data;
 
948
    int i;
 
949
    char ds_var_name[MAX_DS_NAMELEN];
 
950
    int lb[3] = {0,0,0}; 
 
951
    int ub[3] = {1,0,0}; // we put 2 integers to space, 
 
952
    int didx[3]; // for reordering DS dimensions
 
953
    int value[2] = {0, 1}; // integer to be written to space (terminated=1)
 
954
 
 
955
    // tell the readers which files are finalized
 
956
    ds_dimension_ordering(1, 0, 0, didx); // C ordering of 1D array into DS
 
957
    for (i=0; i<p->num_of_files; i++) {
 
958
        /* Put VERSION@fn into space. Indicates that this file will not be extended anymore.  */
 
959
        log_debug("%s: call dspaces_lock_on_write(%s), rank=%d\n", __func__, p->fnames[i], mype);
 
960
        dspaces_lock_on_write(p->fnames[i], &p->mpi_comm); // lock is global operation in DataSpaces
 
961
        if (p->rank == 0) {
 
962
            value[0] = p->fversions[i];
 
963
            snprintf(ds_var_name, MAX_DS_NAMELEN, "VERSION@%s", p->fnames[i]);
 
964
            log_debug ("%s: update %s in the space [%d, %d]\n", 
 
965
                    __func__, ds_var_name, value[0], value[1] );
 
966
            dspaces_put(ds_var_name, 0, sizeof(int),   
 
967
                    lb[didx[0]], lb[didx[1]], lb[didx[2]], 
 
968
                    ub[didx[0]], ub[didx[1]], ub[didx[2]],  
 
969
                    &value); 
 
970
            log_debug("%s: call dspaces_put_sync()\n", __func__);
 
971
            dspaces_put_sync();
 
972
        }
 
973
        log_debug("%s: call dspaces_unlock_on_write(%s), rank=%d\n", __func__, p->fnames[i], mype);
 
974
        dspaces_unlock_on_write(p->fnames[i], &p->mpi_comm);
 
975
        free (p->fnames[i]);
 
976
    }
 
977
 
 
978
    // disconnect from dataspaces if we are connected from writer but not anymore from reader
 
979
    if (globals_adios_is_dataspaces_connected_from_writer() && 
 
980
            !globals_adios_is_dataspaces_connected_from_both())
 
981
    {
 
982
        log_debug ("%s: call dspaces_barrier(), rank=%d\n", __func__,mype);
 
983
        dspaces_barrier();
 
984
        log_debug ("%s: call dspaces_finalize(), rank=%d\n", __func__,mype);
 
985
        dspaces_finalize();
 
986
 
 
987
    }
 
988
    globals_adios_set_dataspaces_disconnected_from_writer();
 
989
 
 
990
    adios_dataspaces_initialized = 0;
 
991
 
 
992
    log_info("%s: exit\n", __func__);
 
993
}
 
994
 
 
995
void adios_dataspaces_end_iteration (struct adios_method_struct * method)
 
996
{
 
997
}
 
998
 
 
999
void adios_dataspaces_start_calculation (struct adios_method_struct * method)
 
1000
{
 
1001
}
 
1002
 
 
1003
void adios_dataspaces_stop_calculation (struct adios_method_struct * method)
 
1004
{
 
1005
}
 
1006