~ubuntu-branches/ubuntu/utopic/adios/utopic

« back to all changes in this revision

Viewing changes to src/nssi/nssi_staging_server.cpp

  • Committer: Package Import Robot
  • Author(s): Alastair McKinstry
  • Date: 2013-12-09 15:21:31 UTC
  • mfrom: (1.1.2)
  • Revision ID: package-import@ubuntu.com-20131209152131-jtd4fpmdv3xnunnm
Tags: 1.5.0-1
* New upstream.
* Standards-Version: 3.9.5
* Include latest config.{sub,guess} 
* New watch file.
* Create libadios-bin for binaries.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/**  @file main.c
 
2
 *
 
3
 *   @brief Driver for the LWFS name server.
 
4
 *
 
5
 *   @author Ron Oldfield (raoldfi\@sandia.gov).
 
6
 *   $Revision: 1264 $.
 
7
 *   $Date: 2007-02-27 15:30:26 -0700 (Tue, 27 Feb 2007) $.
 
8
 */
 
9
 
 
10
 
 
11
#ifndef __STDC_CONSTANT_MACROS
 
12
#define __STDC_CONSTANT_MACROS
 
13
#endif
 
14
 
 
15
#include "config.h"
 
16
 
 
17
#include <unistd.h>
 
18
#include <string.h>
 
19
#include <stdlib.h>
 
20
#include <sys/mman.h>
 
21
 
 
22
#include "adios.h"
 
23
 
 
24
#ifdef HAVE_NSSI
 
25
#ifdef HAVE_PORTALS
 
26
#include "nssi_ptls.h"
 
27
#endif
 
28
#ifdef HAVE_INFINIBAND
 
29
#include "nssi_ib.h"
 
30
#endif
 
31
#include "nssi_server.h"
 
32
#include "nssi_logger.h"
 
33
 
 
34
#include "adios_nssi_args.h"
 
35
#include "adios_nssi_config.h"
 
36
#endif
 
37
 
 
38
#include "io_timer.h"
 
39
#include "aggregation.h"
 
40
 
 
41
#include <mpi.h>
 
42
#include <algorithm>
 
43
#include <map>
 
44
 
 
45
using namespace std;
 
46
 
 
47
 
 
48
 
 
49
#ifdef __LIBCATAMOUNT__
 
50
#define ntohs(value) 0
 
51
#endif
 
52
 
 
53
 
 
54
/* Need a struct to encapsulate open file info.
 
55
 */
 
56
struct open_file {
 
57
    char    ofname[ADIOS_PATH_MAX];
 
58
    int64_t ofdesc;
 
59
 
 
60
    open_file(const char *name) {
 
61
        strcpy(ofname, name);
 
62
        ofdesc=-1;
 
63
    }
 
64
    open_file(const char *name, const int64_t desc) {
 
65
        strcpy(ofname, name);
 
66
        ofdesc=desc;
 
67
    }
 
68
};
 
69
/* Need a comparison operator to pass into the open_file_map
 
70
 */
 
71
struct open_file_lt
 
72
{
 
73
    bool operator()(const struct open_file &of1, const struct open_file &of2) const
 
74
    {
 
75
//        log_debug(rpc_debug_level, "cqp1.qp_num == %u", cqp1.qp_num);
 
76
//        log_debug(rpc_debug_level, "cqp2.qp_num == %u", cqp2.qp_num);
 
77
 
 
78
        if (strcmp(of1.ofname, of2.ofname) <0) return TRUE;
 
79
 
 
80
        return FALSE;
 
81
    }
 
82
};
 
83
 
 
84
/* Map of open files */
 
85
static map<struct open_file, int64_t, open_file_lt> open_file_map;
 
86
typedef map<struct open_file, int64_t, open_file_lt>::iterator open_file_map_iterator_t;
 
87
typedef pair<struct open_file, int64_t> open_file_map_pair_t;
 
88
static pthread_mutex_t open_file_map_mutex=PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
 
89
static pthread_cond_t  open_file_map_cond =PTHREAD_COND_INITIALIZER;
 
90
 
 
91
 
 
92
static int global_rank=-1;
 
93
static int DEBUG=0;
 
94
 
 
95
 
 
96
/* -------------------- PRIVATE FUNCTIONS ---------- */
 
97
void open_file_add(char *fname, int64_t fd)
 
98
{
 
99
    open_file_map[fname]=fd;
 
100
}
 
101
 
 
102
int64_t open_file_get(char *fname)
 
103
{
 
104
    open_file of(fname);
 
105
    int64_t fd=-1;
 
106
 
 
107
    open_file_map_iterator_t iter=open_file_map.find(of);
 
108
    if (iter != open_file_map.end()) {
 
109
        fd=iter->second;
 
110
    }
 
111
 
 
112
    return(fd);
 
113
}
 
114
void open_file_del(char *fname)
 
115
{
 
116
    open_file_map_iterator_t iter;
 
117
    open_file of(fname);
 
118
 
 
119
    open_file_map.erase(of);
 
120
}
 
121
 
 
122
 
 
123
 
 
124
int grank, gsize;
 
125
 
 
126
MPI_Comm comm_self=MPI_COMM_SELF;
 
127
MPI_Comm comm_world=MPI_COMM_WORLD;
 
128
 
 
129
 
 
130
/**
 
131
 * The next 3 utility functions are lifted from IOR.
 
132
 */
 
133
/******************************************************************************/
 
134
/*
 
135
 * Extract key/value pair from hint string.
 
136
 */
 
137
 
 
138
void
 
139
ExtractHint(char * settingVal,
 
140
            char * valueVal,
 
141
            char * hintString)
 
142
{
 
143
    char * settingPtr,
 
144
         * valuePtr,
 
145
         * tmpPtr1,
 
146
         * tmpPtr2;
 
147
 
 
148
    settingPtr = (char *)strtok(hintString, "=");
 
149
    valuePtr = (char *)strtok(NULL, " \t\r\n");
 
150
    tmpPtr1 = settingPtr;
 
151
    tmpPtr2 = (char *)strstr(settingPtr, "MPIIO_HINT__");
 
152
    if (tmpPtr1 == tmpPtr2) {
 
153
        settingPtr += strlen("MPIIO_HINT__");
 
154
    }
 
155
    strcpy(settingVal, settingPtr);
 
156
    strcpy(valueVal, valuePtr);
 
157
} /* ExtractHint() */
 
158
 
 
159
 
 
160
/******************************************************************************/
 
161
/*
 
162
 * Set hints for MPIIO, HDF5, or NCMPI.
 
163
 */
 
164
#define MAX_HINT_STR 1024
 
165
void
 
166
SetHints(MPI_Info * mpiHints, char * hintsFileName)
 
167
{
 
168
    char           hintString[MAX_HINT_STR],
 
169
                   settingVal[MAX_HINT_STR],
 
170
                   valueVal[MAX_HINT_STR];
 
171
    extern char ** environ;
 
172
    int            i;
 
173
    FILE         * fd;
 
174
 
 
175
    /*
 
176
     * This routine checks for hints from the environment and/or from the
 
177
     * hints files.  The hints are of the form:
 
178
     * 'MPIIO_HINT__<hint>=<value>', <hint> is the full name of the hint
 
179
     * to be set, and <value> is the hint value.
 
180
     * E.g., 'setenv MPIIO_HINT__panfs_concurrent_write 1'
 
181
     * or 'MPIIO_HINT__panfs_concurrent_write=1' in the hints file.
 
182
     */
 
183
    MPI_Info_create(mpiHints);
 
184
 
 
185
    /* get hints from environment */
 
186
    for (i = 0; environ[i] != NULL; i++) {
 
187
        /* if this is an IOR_HINT, pass the hint to the info object */
 
188
        if (strncmp(environ[i], "MPIIO_HINT", strlen("MPIIO_HINT")) == 0) {
 
189
            strcpy(hintString, environ[i]);
 
190
            ExtractHint(settingVal, valueVal, hintString);
 
191
            MPI_Info_set(*mpiHints, settingVal, valueVal);
 
192
        }
 
193
    }
 
194
 
 
195
    /* get hints from hints file */
 
196
    if (strcmp(hintsFileName, "") != 0) {
 
197
 
 
198
        /* open the hint file */
 
199
        fd = fopen(hintsFileName, "r");
 
200
        if (fd == NULL) {
 
201
            printf("cannot open hints file\n");
 
202
        } else {
 
203
            /* iterate over hints file */
 
204
            while(fgets(hintString, MAX_HINT_STR, fd) != NULL) {
 
205
                if (strncmp(hintString, "MPIIO_HINT", strlen("MPIIO_HINT")) == 0) {
 
206
                    ExtractHint(settingVal, valueVal, hintString);
 
207
                    MPI_Info_set(*mpiHints, settingVal, valueVal);
 
208
                }
 
209
            }
 
210
            /* close the hints files */
 
211
            if (fclose(fd) != 0) printf("cannot close hints file\n");
 
212
        }
 
213
    }
 
214
} /* SetHints() */
 
215
 
 
216
 
 
217
/******************************************************************************/
 
218
/*
 
219
 * Show all hints (key/value pairs) in an MPI_Info object.
 
220
 */
 
221
 
 
222
void ShowHints(MPI_Info * mpiHints)
 
223
{
 
224
    char key[MPI_MAX_INFO_VAL],
 
225
         value[MPI_MAX_INFO_VAL];
 
226
    int  flag,
 
227
         i,
 
228
         nkeys;
 
229
 
 
230
    MPI_Info_get_nkeys(*mpiHints, &nkeys);
 
231
 
 
232
    for (i = 0; i < nkeys; i++) {
 
233
        MPI_Info_get_nthkey(*mpiHints, i, key);
 
234
        MPI_Info_get(*mpiHints, key, MPI_MAX_INFO_VAL-1, value, &flag);
 
235
        printf("mpiHint[%d]: %s = %s\n", i, key, value);
 
236
    }
 
237
} /* ShowHints() */
 
238
 
 
239
 
 
240
 
 
241
 
 
242
#ifdef __cplusplus
 
243
extern "C" {
 
244
#endif
 
245
 
 
246
extern int adios_nssi_filter_is_anon_dim(
 
247
        int fd,
 
248
        const char *dimname);
 
249
extern void adios_nssi_filter_set_anon_dim(
 
250
        int fd,
 
251
        const char *dimname,
 
252
        const uint64_t dimvalue);
 
253
 
 
254
#ifdef __cplusplus
 
255
}
 
256
#endif
 
257
 
 
258
void agg_and_write(const int64_t fd)
 
259
{
 
260
    if (DEBUG>2) printf("myrank(%d): enter agg_and_write(fd=%ld)\n", grank, fd);
 
261
 
 
262
    Func_Timer("try_aggregation", try_aggregation(fd););  // aggregate all varids in this file
 
263
 
 
264
    int chunk_count=0;
 
265
    aggregation_chunk_details_t **chunks = get_chunks(fd, &chunk_count);
 
266
 
 
267
    for (int j=0;j<chunk_count;j++) {
 
268
        aggregation_chunk_details_t *chunk = chunks[j];
 
269
 
 
270
        // write all offsets for clients rank to update adios internals
 
271
        for(int i=0;i<chunk->ndims;i++) {
 
272
            uint64_t value=0;
 
273
            if (DEBUG>3) printf("writing myrank(%d) chunk(%d) vpath(%s) vname(%s) opath(%s) oname(%s) odata(%lu)\n",
 
274
                    grank, j, chunk->var_path, chunk->var_name, chunk->offset_path[i], chunk->offset_name[i], chunk->offset[i]);
 
275
            if (adios_nssi_filter_is_anon_dim(chunk->fd, chunk->offset_name[i]) == TRUE) {
 
276
                adios_nssi_filter_set_anon_dim(chunk->fd, chunk->offset_name[i], chunk->offset[i]);
 
277
            } else {
 
278
                if (DEBUG>2) printf("server_rank(%d) writing aggregated offset vname(%s)\n", global_rank, chunk->offset_name[i]);
 
279
                Func_Timer("adios_set_path_var", adios_set_path_var(chunk->fd, chunk->offset_path[i], chunk->offset_name[i]););
 
280
                Func_Timer("adios_write", adios_write(chunk->fd, chunk->offset_name[i], &(chunk->offset[i])););
 
281
            }
 
282
        }
 
283
        for(int i=0;i<chunk->ndims;i++) {
 
284
            uint64_t value=0;
 
285
            if (DEBUG>3) printf("writing myrank(%d) chunk(%d) vpath(%s) vname(%s) dpath(%s) dname(%s) ddata(%lu)\n",
 
286
                    grank, j, chunk->var_path, chunk->var_name, chunk->count_path[i], chunk->count_name[i], chunk->count[i]);
 
287
            if (adios_nssi_filter_is_anon_dim(chunk->fd, chunk->count_name[i]) == TRUE) {
 
288
                adios_nssi_filter_set_anon_dim(chunk->fd, chunk->count_name[i], chunk->count[i]);
 
289
            } else {
 
290
                if (DEBUG>2) printf("server_rank(%d) writing aggregated dim vname(%s)\n", global_rank, chunk->count_name[i]);
 
291
                Func_Timer("adios_set_path_var", adios_set_path_var(chunk->fd, chunk->count_path[i], chunk->count_name[i]););
 
292
                Func_Timer("adios_write", adios_write(chunk->fd, chunk->count_name[i], &(chunk->count[i])););
 
293
            }
 
294
        }
 
295
 
 
296
        if (DEBUG>3) printf("writing myrank(%d) vname(%s)\n", grank, chunk->var_name);
 
297
        if (DEBUG>2) printf("server_rank(%d) writing aggregated array vname(%s)\n", global_rank, chunk->var_name);
 
298
        Func_Timer("adios_set_path_var", adios_set_path_var(chunk->fd, chunk->var_path, chunk->var_name););
 
299
        Func_Timer("adios_write", adios_write(chunk->fd, chunk->var_name, chunk->buf););
 
300
 
 
301
//        cleanup_aggregation_chunks(fd, chunk->var_name);
 
302
    }
 
303
 
 
304
    cleanup_aggregation_chunks(fd);
 
305
 
 
306
    if (DEBUG>2) printf("myrank(%d): exit agg_and_write(fd=%ld)\n", grank, fd);
 
307
 
 
308
    return;
 
309
}
 
310
 
 
311
/* -------------------- SERVER-SIDE STUBS ---------- */
 
312
 
 
313
 
 
314
/**
 
315
 * @brief Open a netcdf dataset.
 
316
 *
 
317
 * Open an ADIOS dataset.
 
318
 */
 
319
int nssi_staging_open_stub(
 
320
        const unsigned long request_id,
 
321
        const nssi_remote_pid *caller,
 
322
        const adios_open_args *args,
 
323
        const nssi_rma *data_addr,
 
324
        const nssi_rma *res_addr)
 
325
{
 
326
    int rc = 0;
 
327
    char omode[2];
 
328
    adios_open_res res;  /* this is what we send back to the client */
 
329
    MPI_Info mpiHints = MPI_INFO_NULL;
 
330
 
 
331
    int64_t fd;
 
332
 
 
333
    memset(&res, 0, sizeof(res));
 
334
 
 
335
    if (DEBUG>2) printf("myrank(%d): enter nssi_staging_open_stub(%s, %d)\n", grank, args->fname, args->mode);
 
336
 
 
337
//    SetHints(&mpiHints, "");
 
338
//    ShowHints(&mpiHints);
 
339
 
 
340
    fd = open_file_get(args->fname);
 
341
    if (DEBUG>3) printf("myrank(%d): nssi_staging_open_stub(%s, %d) open_file_get()==%d\n", grank, args->fname, args->mode, fd);
 
342
    if (fd == -1) {
 
343
        omode[0]='\0';
 
344
        omode[1]='\0';
 
345
        switch(args->mode) {
 
346
        case ADIOS_MODE_READ:
 
347
            omode[0]='r';
 
348
            break;
 
349
        case ADIOS_MODE_WRITE:
 
350
            omode[0]='w';
 
351
            break;
 
352
        case ADIOS_MODE_APPEND:
 
353
            omode[0]='a';
 
354
            break;
 
355
        case ADIOS_MODE_UPDATE:
 
356
            omode[0]='u';
 
357
            break;
 
358
        default:
 
359
            break;
 
360
        }
 
361
 
 
362
        if (DEBUG>3) printf("start adios_open\n");
 
363
        if (args->use_single_server==TRUE) {
 
364
            if (DEBUG>3) printf("adios_open: using MPI_COMM_SELF\n");
 
365
            Func_Timer("adios_open", rc = adios_open(&fd, args->gname, args->fname, omode, comm_self););
 
366
        } else {
 
367
            if (DEBUG>3) printf("adios_open: using MPI_COMM_WORLD\n");
 
368
            Func_Timer("adios_open", rc = adios_open(&fd, args->gname, args->fname, omode, comm_world););
 
369
        }
 
370
        if (rc != 0) {
 
371
            printf("Error opening file \"%s\": %d\n", args->fname, rc);
 
372
            goto cleanup;
 
373
        }
 
374
        if (DEBUG>3) printf("end adios_open\n");
 
375
 
 
376
        open_file_add(args->fname, fd);
 
377
 
 
378
        add_file(fd, WRITE_CACHING_COLLECTIVE);
 
379
    }
 
380
 
 
381
    if (DEBUG>2) printf("myrank(%d): exit nssi_staging_open_stub(%s, %d): fd=%ld, fd=%p\n", grank, args->fname, args->mode, fd, fd);
 
382
 
 
383
    res.fd=fd;
 
384
 
 
385
cleanup:
 
386
    /* send the ncid and return code back to client */
 
387
    rc = nssi_send_result(caller, request_id, rc, &res, res_addr);
 
388
 
 
389
    return rc;
 
390
}
 
391
 
 
392
int nssi_staging_group_size_stub(
 
393
        const unsigned long request_id,
 
394
        const nssi_remote_pid *caller,
 
395
        const adios_group_size_args *args,
 
396
        const nssi_rma *data_addr,
 
397
        const nssi_rma *res_addr)
 
398
{
 
399
    int rc = 0;
 
400
    uint64_t total_size=0;
 
401
 
 
402
    if (DEBUG>2) printf("myrank(%d): enter nssi_staging_group_size_stub(fd=%ld, pg_size=%ld)\n", grank, args->fd, args->data_size);
 
403
 
 
404
    Func_Timer("adios_group_size", rc = adios_group_size(args->fd, args->data_size, &total_size););
 
405
    if (rc != 0) {
 
406
        printf("adios_group_size failed: %d\n", rc);
 
407
    }
 
408
 
 
409
    /* send result to client */
 
410
    rc = nssi_send_result(caller, request_id, rc, NULL, res_addr);
 
411
 
 
412
    if (DEBUG>2) printf("myrank(%d): exit nssi_staging_group_size_stub(%ld)\n", grank, args->fd);
 
413
 
 
414
    return rc;
 
415
}
 
416
 
 
417
int nssi_staging_close_stub(
 
418
        const unsigned long request_id,
 
419
        const nssi_remote_pid *caller,
 
420
        const adios_close_args *args,
 
421
        const nssi_rma *data_addr,
 
422
        const nssi_rma *res_addr)
 
423
{
 
424
    int rc = 0;
 
425
 
 
426
    if (DEBUG>2) printf("myrank(%d): enter nssi_staging_close_stub(%ld, %s)\n", grank, args->fd, args->fname);
 
427
 
 
428
    agg_and_write(args->fd);
 
429
 
 
430
    Func_Timer("adios_close", adios_close(args->fd););
 
431
 
 
432
    open_file_del(args->fname);
 
433
 
 
434
    /* send result to client */
 
435
    rc = nssi_send_result(caller, request_id, rc, NULL, res_addr);
 
436
 
 
437
    if (DEBUG>2) printf("myrank(%d): exit nssi_staging_close_stub(%ld, %s)\n", grank, args->fd, args->fname);
 
438
 
 
439
    return rc;
 
440
}
 
441
 
 
442
int nssi_staging_read_stub(
 
443
        const unsigned long request_id,
 
444
        const nssi_remote_pid *caller,
 
445
        const adios_read_args *args,
 
446
        const nssi_rma *data_addr,
 
447
        const nssi_rma *res_addr)
 
448
{
 
449
    int rc = 0;
 
450
    adios_read_res res;
 
451
    char vpathname[ADIOS_PATH_MAX];
 
452
    int pathlen;
 
453
    char *v=NULL;
 
454
 
 
455
    if (DEBUG>2) printf("myrank(%d): enter nssi_staging_read_stub(%ld)\n", grank, args->fd);
 
456
 
 
457
//    pathlen=strlen(args->vpath);
 
458
//    if (args->vpath[pathlen-1]=='/') {
 
459
//        sprintf(vpathname, "%s%s", args->vpath, args->vname);
 
460
//    } else {
 
461
//        sprintf(vpathname, "%s/%s", args->vpath, args->vname);
 
462
//    }
 
463
 
 
464
 
 
465
    for (int i=0;i<args->offsets.offsets_len;i++) {
 
466
        uint64_t *odata=(uint64_t *)malloc(sizeof(uint8_t));
 
467
        *odata = args->offsets.offsets_val[i].vdata;
 
468
        Func_Timer("adios_set_path_var", adios_set_path_var(args->fd, args->offsets.offsets_val[i].vpath, args->offsets.offsets_val[i].vname););
 
469
        Func_Timer("adios_read", adios_read(args->fd, args->offsets.offsets_val[i].vname, odata, 8););
 
470
    }
 
471
    for (int i=0;i<args->ldims.ldims_len;i++) {
 
472
        uint64_t *ddata=(uint64_t *)malloc(sizeof(uint8_t));
 
473
        *ddata = args->ldims.ldims_val[i].vdata;
 
474
        Func_Timer("adios_set_path_var", adios_set_path_var(args->fd, args->ldims.ldims_val[i].vpath, args->ldims.ldims_val[i].vname););
 
475
        Func_Timer("adios_read", adios_read(args->fd, args->ldims.ldims_val[i].vname, ddata, 8););
 
476
    }
 
477
 
 
478
    v=(char *)calloc(args->max_read, 1);
 
479
 
 
480
    Func_Timer("adios_set_path_var", adios_set_path_var(args->fd, args->vpath, args->vname););
 
481
    Func_Timer("adios_read", adios_read(args->fd, args->vname, v, args->max_read););
 
482
 
 
483
    res.bytes_read=args->max_read;
 
484
 
 
485
    Func_Timer("nssi_put_data", rc = nssi_put_data(caller, v, res.bytes_read, data_addr, -1););
 
486
    if (rc != NSSI_OK) {
 
487
        printf("Could not put var data on client\n");
 
488
        goto cleanup;
 
489
    }
 
490
 
 
491
cleanup:
 
492
    /* send result to client */
 
493
    rc = nssi_send_result(caller, request_id, rc, &res, res_addr);
 
494
 
 
495
    if (DEBUG>2) printf("myrank(%d): exit nssi_staging_read_stub(%ld)\n", grank, args->fd);
 
496
 
 
497
    return rc;
 
498
}
 
499
 
 
500
int nssi_staging_write_stub(
 
501
        const unsigned long request_id,
 
502
        const nssi_remote_pid *caller,
 
503
        const adios_write_args *args,
 
504
        const nssi_rma *data_addr,
 
505
        const nssi_rma *res_addr)
 
506
{
 
507
    int rc = 0;
 
508
    adios_write_res res;
 
509
    char vpathname[ADIOS_PATH_MAX];
 
510
    int pathlen;
 
511
    char *v=NULL;
 
512
    int i=0;
 
513
 
 
514
    if (DEBUG>2) printf("myrank(%d): enter nssi_staging_write_stub(fd=%ld, vsize=%ld)\n", grank, args->fd, args->vsize);
 
515
 
 
516
//    pathlen=strlen(args->vpath);
 
517
//    if (args->vpath[pathlen-1]=='/') {
 
518
//        sprintf(vpathname, "%s%s", args->vpath, args->vname);
 
519
//    } else {
 
520
//        sprintf(vpathname, "%s/%s", args->vpath, args->vname);
 
521
//    }
 
522
 
 
523
    v=(char *)malloc(args->vsize);
 
524
 
 
525
    Func_Timer("nssi_get_data", rc = nssi_get_data(caller, v, args->vsize, data_addr););
 
526
    if (rc != NSSI_OK) {
 
527
        printf("Could not get var data on client\n");
 
528
        goto cleanup;
 
529
    }
 
530
 
 
531
    if (DEBUG>3) printf("server_rank(%d) vname(%s) vsize(%ld) is_scalar(%d) writer_rank(%ld)\n", global_rank, args->vname, args->vsize, args->is_scalar, args->writer_rank);
 
532
 
 
533
    if (!args->is_scalar) {
 
534
        if (DEBUG>2) printf("server_rank(%d) caching non-scalar vname(%s) vsize(%ld)\n", global_rank, args->vname, args->vsize);
 
535
        if (DEBUG>3) printf("allocated v(%p), len(%ld)\n", v, args->vsize);
 
536
        aggregation_chunk_details_t *chunk=NULL;
 
537
        chunk = new aggregation_chunk_details_t;
 
538
        chunk->fd = args->fd;
 
539
        strcpy(chunk->var_path, args->vpath);
 
540
        strcpy(chunk->var_name, args->vname);
 
541
        chunk->ndims = args->offsets.offsets_len;
 
542
        chunk->buf = v;
 
543
        chunk->atype = (enum ADIOS_DATATYPES)args->atype;
 
544
        chunk->len   = args->vsize;
 
545
        chunk->num_elements = 1;
 
546
        for (int i=0;i<args->ldims.ldims_len;i++) {
 
547
            chunk->num_elements *= args->ldims.ldims_val[i].vdata;
 
548
        }
 
549
        chunk->offset_path = (char **)calloc(args->offsets.offsets_len, sizeof(char *));
 
550
        chunk->offset_name = (char **)calloc(args->offsets.offsets_len, sizeof(char *));
 
551
        chunk->offset = (uint64_t *)calloc(args->offsets.offsets_len, sizeof(uint64_t));
 
552
        for (int i=0;i<args->offsets.offsets_len;i++) {
 
553
            chunk->offset_path[i] = strdup(args->offsets.offsets_val[i].vpath);
 
554
            chunk->offset_name[i] = strdup(args->offsets.offsets_val[i].vname);
 
555
            chunk->offset[i] = args->offsets.offsets_val[i].vdata;
 
556
        }
 
557
        chunk->count_path = (char **)calloc(args->ldims.ldims_len, sizeof(char *));
 
558
        chunk->count_name = (char **)calloc(args->ldims.ldims_len, sizeof(char *));
 
559
        chunk->count  = (uint64_t *)calloc(args->ldims.ldims_len, sizeof(uint64_t));;
 
560
        for (int i=0;i<args->ldims.ldims_len;i++) {
 
561
            chunk->count_path[i] = strdup(args->ldims.ldims_val[i].vpath);
 
562
            chunk->count_name[i] = strdup(args->ldims.ldims_val[i].vname);
 
563
            chunk->count[i] = args->ldims.ldims_val[i].vdata;
 
564
        }
 
565
        add_chunk(chunk);
 
566
 
 
567
    } else {
 
568
        if (DEBUG>2) printf("server_rank(%d) writing scalar vname(%s) vsize(%ld)\n", global_rank, args->vname, args->vsize);
 
569
        Func_Timer("adios_set_path_var", adios_set_path_var(args->fd, args->vpath, args->vname););
 
570
        Func_Timer("adios_write", adios_write(args->fd, args->vname, v););
 
571
 
 
572
        free(v);
 
573
    }
 
574
 
 
575
    res.bytes_written=args->vsize;
 
576
 
 
577
cleanup:
 
578
 
 
579
    /* send result to client */
 
580
    rc = nssi_send_result(caller, request_id, rc, &res, res_addr);
 
581
 
 
582
    if (DEBUG>2) printf("myrank(%d): exit nssi_staging_write_stub(fd=%ld, vsize=%ld)\n", grank, args->fd, args->vsize);
 
583
 
 
584
    return rc;
 
585
}
 
586
 
 
587
int nssi_staging_start_calc_stub(
 
588
        const unsigned long request_id,
 
589
        const nssi_remote_pid *caller,
 
590
        const adios_start_calc_args *args,
 
591
        const nssi_rma *data_addr,
 
592
        const nssi_rma *res_addr)
 
593
{
 
594
    int rc = 0;
 
595
 
 
596
    if (DEBUG>2) printf("myrank(%d): enter nssi_staging_start_calc_stub(%ld)\n", grank, args->fd);
 
597
 
 
598
    agg_and_write(args->fd);
 
599
 
 
600
    /* send result to client */
 
601
    rc = nssi_send_result(caller, request_id, rc, NULL, res_addr);
 
602
 
 
603
    if (DEBUG>2) printf("myrank(%d): exit nssi_staging_start_calc_stub(%ld)\n", grank, args->fd);
 
604
 
 
605
    return rc;
 
606
}
 
607
 
 
608
int nssi_staging_stop_calc_stub(
 
609
        const unsigned long request_id,
 
610
        const nssi_remote_pid *caller,
 
611
        const adios_stop_calc_args *args,
 
612
        const nssi_rma *data_addr,
 
613
        const nssi_rma *res_addr)
 
614
{
 
615
    int rc = 0;
 
616
 
 
617
    if (DEBUG>2) printf("myrank(%d): enter nssi_staging_stop_calc_stub(%ld)\n", grank, args->fd);
 
618
 
 
619
 
 
620
    /* send result to client */
 
621
    rc = nssi_send_result(caller, request_id, rc, NULL, res_addr);
 
622
 
 
623
    if (DEBUG>2) printf("myrank(%d): exit nssi_staging_stop_calc_stub(%ld)\n", grank, args->fd);
 
624
 
 
625
    return rc;
 
626
}
 
627
 
 
628
int nssi_staging_end_iter_stub(
 
629
        const unsigned long request_id,
 
630
        const nssi_remote_pid *caller,
 
631
        const adios_end_iter_args *args,
 
632
        const nssi_rma *data_addr,
 
633
        const nssi_rma *res_addr)
 
634
{
 
635
    int rc = 0;
 
636
 
 
637
    if (DEBUG>2) printf("myrank(%d): enter nssi_staging_end_iter_stub(%ld)\n", grank, args->fd);
 
638
 
 
639
 
 
640
    /* send result to client */
 
641
    rc = nssi_send_result(caller, request_id, rc, NULL, res_addr);
 
642
 
 
643
    if (DEBUG>2) printf("myrank(%d): exit nssi_staging_end_iter_stub(%ld)\n", grank, args->fd);
 
644
 
 
645
    return rc;
 
646
}
 
647
 
 
648
int nssi_staging_finalize_stub(
 
649
        const unsigned long request_id,
 
650
        const nssi_remote_pid *caller,
 
651
        const adios_finalize_args *args,
 
652
        const nssi_rma *data_addr,
 
653
        const nssi_rma *res_addr)
 
654
{
 
655
    int rc = 0;
 
656
 
 
657
    if (DEBUG>2) printf("myrank(%d): enter nssi_staging_finalize_stub(%s)\n", grank, args->client_id);
 
658
 
 
659
    /*
 
660
     *
 
661
     * do nothing
 
662
     *
 
663
     */
 
664
 
 
665
    /* send result to client */
 
666
    rc = nssi_send_result(caller, request_id, rc, NULL, res_addr);
 
667
 
 
668
    if (DEBUG>2) printf("myrank(%d): exit nssi_staging_finalize_stub(%s)\n", grank, args->client_id);
 
669
 
 
670
    return rc;
 
671
}
 
672
 
 
673
/* -------- END SERVER-SIDE STUBS -------------- */
 
674
 
 
675
int nssi_staging_server_init(const char *adios_config_file)
 
676
{
 
677
    int rc=NSSI_OK;
 
678
 
 
679
    if (DEBUG>3) printf("start adios_init(%s)\n", adios_config_file);
 
680
    rc = adios_init(adios_config_file);
 
681
    if (rc != 1) {
 
682
        printf("adios_init() failed: %d\n", rc);
 
683
        return(-1);
 
684
    }
 
685
    if (DEBUG>3) printf("end adios_init(%s)\n", adios_config_file);
 
686
 
 
687
 
 
688
    /* register server stubs */
 
689
    NSSI_REGISTER_SERVER_STUB(ADIOS_OPEN_OP, nssi_staging_open_stub, adios_open_args, adios_open_res);
 
690
    NSSI_REGISTER_SERVER_STUB(ADIOS_GROUP_SIZE_OP, nssi_staging_group_size_stub, adios_group_size_args, void);
 
691
    NSSI_REGISTER_SERVER_STUB(ADIOS_READ_OP, nssi_staging_read_stub, adios_read_args, adios_read_res);
 
692
    NSSI_REGISTER_SERVER_STUB(ADIOS_WRITE_OP, nssi_staging_write_stub, adios_write_args, adios_write_res);
 
693
    NSSI_REGISTER_SERVER_STUB(ADIOS_END_ITER_OP, nssi_staging_end_iter_stub, adios_end_iter_args, void);
 
694
    NSSI_REGISTER_SERVER_STUB(ADIOS_START_CALC_OP, nssi_staging_start_calc_stub, adios_start_calc_args, void);
 
695
    NSSI_REGISTER_SERVER_STUB(ADIOS_STOP_CALC_OP, nssi_staging_stop_calc_stub, adios_stop_calc_args, void);
 
696
    NSSI_REGISTER_SERVER_STUB(ADIOS_CLOSE_OP, nssi_staging_close_stub, adios_close_args, void);
 
697
    NSSI_REGISTER_SERVER_STUB(ADIOS_FINALIZE_OP, nssi_staging_finalize_stub, adios_finalize_args, void);
 
698
 
 
699
    return 0;
 
700
}
 
701
 
 
702
 
 
703
 
 
704
static void generate_contact_info(nssi_remote_pid *myid)
 
705
{
 
706
    nssi_remote_pid *all_pids=NULL;
 
707
    int rank, np;
 
708
    char contact_path[1024];
 
709
 
 
710
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
 
711
    //printf("rank (%d)\n", rank);
 
712
 
 
713
    if (rank==0) {
 
714
        MPI_Comm_size(MPI_COMM_WORLD, &np);
 
715
        all_pids=(nssi_remote_pid *)malloc(np*sizeof(nssi_remote_pid));
 
716
    }
 
717
    MPI_Gather(myid, sizeof(nssi_remote_pid), MPI_BYTE,
 
718
               all_pids, sizeof(nssi_remote_pid), MPI_BYTE,
 
719
               0, MPI_COMM_WORLD);
 
720
    if (rank==0) {
 
721
        char *contact_file=getenv("ADIOS_NSSI_CONTACT_INFO");
 
722
        if (contact_file==NULL) {
 
723
            printf("ADIOS_NSSI_CONTACT_INFO env var is undefined.\n");
 
724
            free(all_pids);
 
725
            return;
 
726
        }
 
727
//        sprintf(contact_path, "%s.%04d", contact_file, rank);
 
728
        sprintf(contact_path, "%s", contact_file);
 
729
        if (DEBUG>3) printf("creating contact file (%s)\n", contact_path);
 
730
        FILE *f=fopen(contact_path, "w");
 
731
        if (f==NULL) {
 
732
            perror("fopen");
 
733
        }
 
734
        for (int i=0;i<np;i++) {
 
735
            fprintf(f, "%u@%u@%s@%u\n",
 
736
                    all_pids[i].nid, all_pids[i].pid,
 
737
                    all_pids[i].hostname, (unsigned int)ntohs(all_pids[i].port));
 
738
        }
 
739
//        fprintf(f, "%u@%u@%s@%u\n",
 
740
//                myid->nid, myid->pid,
 
741
//                myid->hostname, (unsigned int)ntohs(myid->port));
 
742
        fclose(f);
 
743
        free(all_pids);
 
744
    }
 
745
    MPI_Barrier(MPI_COMM_WORLD);
 
746
}
 
747
 
 
748
 
 
749
/**
 
750
 * @brief The LWFS xfer-server.
 
751
 */
 
752
int main(int argc, char **argv)
 
753
{
 
754
    int rc = NSSI_OK;
 
755
 
 
756
    nssi_service nssi_svc;
 
757
//    log_level debug_level;
 
758
    char logfile[1024];
 
759
    int rank, np;
 
760
 
 
761
    MPI_Init(&argc, &argv);
 
762
    MPI_Comm_rank(MPI_COMM_WORLD, &global_rank);
 
763
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
 
764
    MPI_Comm_size(MPI_COMM_WORLD, &np);
 
765
    grank=rank;
 
766
    gsize=np;
 
767
 
 
768
    /* options that can be overriden by the command-line */
 
769
    bool daemon_flag = false;
 
770
    int verbose = 5;  /* default debug_level */
 
771
    int num_threads = 0;
 
772
    int server_pid = 128;   /* process ID of the server */
 
773
    int server_port = 7728; /* TCP port of the server */
 
774
 
 
775
    memset(&nssi_svc, 0, sizeof(nssi_service));
 
776
 
 
777
    /* initialize and enable logging */
 
778
//    if (args_info.logfile_arg != NULL) {
 
779
//        sprintf(logfile, "%s.%04d", "nssi_staging_server.log", rank);
 
780
//        logger_init((log_level)verbose, logfile);
 
781
//    } else {
 
782
//        logger_init((log_level)verbose, NULL);
 
783
//    }
 
784
//    netcdf_debug_level=(log_level)(log_level)args_info.verbose_arg;
 
785
//    debug_level = (log_level)args_info.verbose_arg;
 
786
 
 
787
//    logger_init((log_level)verbose, NULL);
 
788
 
 
789
    if (daemon_flag) {
 
790
        nssi_daemonize();
 
791
    }
 
792
 
 
793
#ifdef HAVE_PORTALS
 
794
    nssi_ptl_init(PTL_IFACE_SERVER, server_pid);
 
795
    rc = nssi_rpc_init(NSSI_RPC_PTL, NSSI_RPC_XDR);
 
796
    if (rc != NSSI_OK) {
 
797
        printf("could not init rpc: %s\n",
 
798
                nssi_err_str(rc));
 
799
        return rc;
 
800
    }
 
801
    nssi_remote_pid myid;
 
802
    memset(&myid, 0, sizeof(nssi_remote_pid));
 
803
    nssi_ptl_get_id(&myid);
 
804
    generate_contact_info(&myid);
 
805
#endif
 
806
#ifdef HAVE_INFINIBAND
 
807
    memset(&nssi_svc.req_addr.match_id, 0, sizeof(nssi_remote_pid));
 
808
    strcpy(nssi_svc.req_addr.match_id.hostname, args_info.server_addr_arg);
 
809
    nssi_svc.req_addr.match_id.port = args_info.server_port_arg;
 
810
 
 
811
    nssi_ib_init(&nssi_svc);
 
812
    rc = nssi_rpc_init(NSSI_RPC_IB, NSSI_RPC_XDR);
 
813
    if (rc != NSSI_OK) {
 
814
        printf("could not init rpc: %s\n",
 
815
                nssi_err_str(rc));
 
816
        return rc;
 
817
    }
 
818
    generate_contact_info(&nssi_svc.req_addr.match_id);
 
819
#endif
 
820
 
 
821
    if (DEBUG>3) printf("Initialize staging service\n");
 
822
 
 
823
    /* initialize the lwfs service */
 
824
    rc = nssi_service_init(0, NSSI_SHORT_REQUEST_SIZE, &nssi_svc);
 
825
    if (rc != NSSI_OK) {
 
826
        printf("could not init nssi_svc: %s\n",
 
827
                nssi_err_str(rc));
 
828
        return -1;
 
829
    }
 
830
 
 
831
    /* initialize staging service */
 
832
    rc = nssi_staging_server_init(argv[1]);
 
833
 
 
834
    /* start processing requests */
 
835
    nssi_svc.max_reqs = -1;
 
836
    rc = nssi_service_start(&nssi_svc, num_threads);
 
837
    if (rc != NSSI_OK) {
 
838
        printf("exited nssi_svc: %s\n",
 
839
                nssi_err_str(rc));
 
840
    }
 
841
 
 
842
    adios_finalize(rank);
 
843
 
 
844
    /* shutdown the nssi_svc */
 
845
    if (DEBUG>3) printf("shutting down service library\n");
 
846
    nssi_service_fini(&nssi_svc);
 
847
 
 
848
    nssi_rpc_fini();
 
849
 
 
850
    MPI_Finalize();
 
851
 
 
852
    return rc;
 
853
}