~ubuntu-branches/ubuntu/utopic/adios/utopic

« back to all changes in this revision

Viewing changes to src/nssi_staging_server.cpp

  • Committer: Package Import Robot
  • Author(s): Alastair McKinstry
  • Date: 2013-12-09 15:21:31 UTC
  • mfrom: (1.1.2)
  • Revision ID: package-import@ubuntu.com-20131209152131-jtd4fpmdv3xnunnm
Tags: 1.5.0-1
* New upstream.
* Standards-Version: 3.9.5
* Include latest config.{sub,guess} 
* New watch file.
* Create libadios-bin for binaries.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/**  @file main.c
2
 
 *
3
 
 *   @brief Driver for the LWFS name server.
4
 
 *
5
 
 *   @author Ron Oldfield (raoldfi\@sandia.gov).
6
 
 *   $Revision: 1264 $.
7
 
 *   $Date: 2007-02-27 15:30:26 -0700 (Tue, 27 Feb 2007) $.
8
 
 */
9
 
 
10
 
 
11
 
#ifndef __STDC_CONSTANT_MACROS
12
 
#define __STDC_CONSTANT_MACROS
13
 
#endif
14
 
 
15
 
#include "config.h"
16
 
 
17
 
#include <unistd.h>
18
 
#include <string.h>
19
 
#include <stdlib.h>
20
 
#include <sys/mman.h>
21
 
 
22
 
#include "adios.h"
23
 
 
24
 
#ifdef HAVE_NSSI
25
 
#ifdef HAVE_PORTALS
26
 
#include "nssi_ptls.h"
27
 
#endif
28
 
#ifdef HAVE_INFINIBAND
29
 
#include "nssi_ib.h"
30
 
#endif
31
 
#include "nssi_server.h"
32
 
#include "nssi_logger.h"
33
 
 
34
 
#include "adios_nssi_args.h"
35
 
#include "adios_nssi_config.h"
36
 
#endif
37
 
 
38
 
#include "io_timer.h"
39
 
#include "aggregation.h"
40
 
 
41
 
#include <mpi.h>
42
 
#include <algorithm>
43
 
#include <map>
44
 
 
45
 
using namespace std;
46
 
 
47
 
 
48
 
 
49
 
#ifdef __LIBCATAMOUNT__
50
 
#define ntohs(value) 0
51
 
#endif
52
 
 
53
 
 
54
 
/* Need a struct to encapsulate open file info.
55
 
 */
56
 
struct open_file {
57
 
    char    ofname[ADIOS_PATH_MAX];
58
 
    int64_t ofdesc;
59
 
 
60
 
    open_file(const char *name) {
61
 
        strcpy(ofname, name);
62
 
        ofdesc=-1;
63
 
    }
64
 
    open_file(const char *name, const int64_t desc) {
65
 
        strcpy(ofname, name);
66
 
        ofdesc=desc;
67
 
    }
68
 
};
69
 
/* Need a comparison operator to pass into the open_file_map
70
 
 */
71
 
struct open_file_lt
72
 
{
73
 
    bool operator()(const struct open_file &of1, const struct open_file &of2) const
74
 
    {
75
 
//        log_debug(rpc_debug_level, "cqp1.qp_num == %u", cqp1.qp_num);
76
 
//        log_debug(rpc_debug_level, "cqp2.qp_num == %u", cqp2.qp_num);
77
 
 
78
 
        if (strcmp(of1.ofname, of2.ofname) <0) return TRUE;
79
 
 
80
 
        return FALSE;
81
 
    }
82
 
};
83
 
 
84
 
/* Map of open files */
85
 
static map<struct open_file, int64_t, open_file_lt> open_file_map;
86
 
typedef map<struct open_file, int64_t, open_file_lt>::iterator open_file_map_iterator_t;
87
 
typedef pair<struct open_file, int64_t> open_file_map_pair_t;
88
 
static pthread_mutex_t open_file_map_mutex=PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
89
 
static pthread_cond_t  open_file_map_cond =PTHREAD_COND_INITIALIZER;
90
 
 
91
 
 
92
 
static int global_rank=-1;
93
 
static int DEBUG=0;
94
 
 
95
 
 
96
 
/* -------------------- PRIVATE FUNCTIONS ---------- */
97
 
void open_file_add(char *fname, int64_t fd)
98
 
{
99
 
    open_file_map[fname]=fd;
100
 
}
101
 
 
102
 
int64_t open_file_get(char *fname)
103
 
{
104
 
    open_file of(fname);
105
 
    int64_t fd=-1;
106
 
 
107
 
    open_file_map_iterator_t iter=open_file_map.find(of);
108
 
    if (iter != open_file_map.end()) {
109
 
        fd=iter->second;
110
 
    }
111
 
 
112
 
    return(fd);
113
 
}
114
 
void open_file_del(char *fname)
115
 
{
116
 
    open_file_map_iterator_t iter;
117
 
    open_file of(fname);
118
 
 
119
 
    open_file_map.erase(of);
120
 
}
121
 
 
122
 
 
123
 
 
124
 
int grank, gsize;
125
 
 
126
 
MPI_Comm comm_self=MPI_COMM_SELF;
127
 
MPI_Comm comm_world=MPI_COMM_WORLD;
128
 
 
129
 
 
130
 
/**
131
 
 * The next 3 utility functions are lifted from IOR.
132
 
 */
133
 
/******************************************************************************/
134
 
/*
135
 
 * Extract key/value pair from hint string.
136
 
 */
137
 
 
138
 
void
139
 
ExtractHint(char * settingVal,
140
 
            char * valueVal,
141
 
            char * hintString)
142
 
{
143
 
    char * settingPtr,
144
 
         * valuePtr,
145
 
         * tmpPtr1,
146
 
         * tmpPtr2;
147
 
 
148
 
    settingPtr = (char *)strtok(hintString, "=");
149
 
    valuePtr = (char *)strtok(NULL, " \t\r\n");
150
 
    tmpPtr1 = settingPtr;
151
 
    tmpPtr2 = (char *)strstr(settingPtr, "MPIIO_HINT__");
152
 
    if (tmpPtr1 == tmpPtr2) {
153
 
        settingPtr += strlen("MPIIO_HINT__");
154
 
    }
155
 
    strcpy(settingVal, settingPtr);
156
 
    strcpy(valueVal, valuePtr);
157
 
} /* ExtractHint() */
158
 
 
159
 
 
160
 
/******************************************************************************/
161
 
/*
162
 
 * Set hints for MPIIO, HDF5, or NCMPI.
163
 
 */
164
 
#define MAX_HINT_STR 1024
165
 
void
166
 
SetHints(MPI_Info * mpiHints, char * hintsFileName)
167
 
{
168
 
    char           hintString[MAX_HINT_STR],
169
 
                   settingVal[MAX_HINT_STR],
170
 
                   valueVal[MAX_HINT_STR];
171
 
    extern char ** environ;
172
 
    int            i;
173
 
    FILE         * fd;
174
 
 
175
 
    /*
176
 
     * This routine checks for hints from the environment and/or from the
177
 
     * hints files.  The hints are of the form:
178
 
     * 'MPIIO_HINT__<hint>=<value>', <hint> is the full name of the hint
179
 
     * to be set, and <value> is the hint value.
180
 
     * E.g., 'setenv MPIIO_HINT__panfs_concurrent_write 1'
181
 
     * or 'MPIIO_HINT__panfs_concurrent_write=1' in the hints file.
182
 
     */
183
 
    MPI_Info_create(mpiHints);
184
 
 
185
 
    /* get hints from environment */
186
 
    for (i = 0; environ[i] != NULL; i++) {
187
 
        /* if this is an IOR_HINT, pass the hint to the info object */
188
 
        if (strncmp(environ[i], "MPIIO_HINT", strlen("MPIIO_HINT")) == 0) {
189
 
            strcpy(hintString, environ[i]);
190
 
            ExtractHint(settingVal, valueVal, hintString);
191
 
            MPI_Info_set(*mpiHints, settingVal, valueVal);
192
 
        }
193
 
    }
194
 
 
195
 
    /* get hints from hints file */
196
 
    if (strcmp(hintsFileName, "") != 0) {
197
 
 
198
 
        /* open the hint file */
199
 
        fd = fopen(hintsFileName, "r");
200
 
        if (fd == NULL) {
201
 
            printf("cannot open hints file\n");
202
 
        } else {
203
 
            /* iterate over hints file */
204
 
            while(fgets(hintString, MAX_HINT_STR, fd) != NULL) {
205
 
                if (strncmp(hintString, "MPIIO_HINT", strlen("MPIIO_HINT")) == 0) {
206
 
                    ExtractHint(settingVal, valueVal, hintString);
207
 
                    MPI_Info_set(*mpiHints, settingVal, valueVal);
208
 
                }
209
 
            }
210
 
            /* close the hints files */
211
 
            if (fclose(fd) != 0) printf("cannot close hints file\n");
212
 
        }
213
 
    }
214
 
} /* SetHints() */
215
 
 
216
 
 
217
 
/******************************************************************************/
218
 
/*
219
 
 * Show all hints (key/value pairs) in an MPI_Info object.
220
 
 */
221
 
 
222
 
void ShowHints(MPI_Info * mpiHints)
223
 
{
224
 
    char key[MPI_MAX_INFO_VAL],
225
 
         value[MPI_MAX_INFO_VAL];
226
 
    int  flag,
227
 
         i,
228
 
         nkeys;
229
 
 
230
 
    MPI_Info_get_nkeys(*mpiHints, &nkeys);
231
 
 
232
 
    for (i = 0; i < nkeys; i++) {
233
 
        MPI_Info_get_nthkey(*mpiHints, i, key);
234
 
        MPI_Info_get(*mpiHints, key, MPI_MAX_INFO_VAL-1, value, &flag);
235
 
        printf("mpiHint[%d]: %s = %s\n", i, key, value);
236
 
    }
237
 
} /* ShowHints() */
238
 
 
239
 
 
240
 
 
241
 
 
242
 
#ifdef __cplusplus
243
 
extern "C" {
244
 
#endif
245
 
 
246
 
extern int adios_nssi_filter_is_anon_dim(
247
 
        int fd,
248
 
        const char *dimname);
249
 
extern void adios_nssi_filter_set_anon_dim(
250
 
        int fd,
251
 
        const char *dimname,
252
 
        const uint64_t dimvalue);
253
 
 
254
 
#ifdef __cplusplus
255
 
}
256
 
#endif
257
 
 
258
 
void agg_and_write(const int64_t fd)
259
 
{
260
 
    if (DEBUG>2) printf("myrank(%d): enter agg_and_write(fd=%ld)\n", grank, fd);
261
 
 
262
 
    Func_Timer("try_aggregation", try_aggregation(fd););  // aggregate all varids in this file
263
 
 
264
 
    int chunk_count=0;
265
 
    aggregation_chunk_details_t **chunks = get_chunks(fd, &chunk_count);
266
 
 
267
 
    for (int j=0;j<chunk_count;j++) {
268
 
        aggregation_chunk_details_t *chunk = chunks[j];
269
 
 
270
 
        // write all offsets for clients rank to update adios internals
271
 
        for(int i=0;i<chunk->ndims;i++) {
272
 
            uint64_t value=0;
273
 
            if (DEBUG>3) printf("writing myrank(%d) chunk(%d) vpath(%s) vname(%s) opath(%s) oname(%s) odata(%lu)\n",
274
 
                    grank, j, chunk->var_path, chunk->var_name, chunk->offset_path[i], chunk->offset_name[i], chunk->offset[i]);
275
 
            if (adios_nssi_filter_is_anon_dim(chunk->fd, chunk->offset_name[i]) == TRUE) {
276
 
                adios_nssi_filter_set_anon_dim(chunk->fd, chunk->offset_name[i], chunk->offset[i]);
277
 
            } else {
278
 
                if (DEBUG>2) printf("server_rank(%d) writing aggregated offset vname(%s)\n", global_rank, chunk->offset_name[i]);
279
 
                Func_Timer("adios_set_path_var", adios_set_path_var(chunk->fd, chunk->offset_path[i], chunk->offset_name[i]););
280
 
                Func_Timer("adios_write", adios_write(chunk->fd, chunk->offset_name[i], &(chunk->offset[i])););
281
 
            }
282
 
        }
283
 
        for(int i=0;i<chunk->ndims;i++) {
284
 
            uint64_t value=0;
285
 
            if (DEBUG>3) printf("writing myrank(%d) chunk(%d) vpath(%s) vname(%s) dpath(%s) dname(%s) ddata(%lu)\n",
286
 
                    grank, j, chunk->var_path, chunk->var_name, chunk->count_path[i], chunk->count_name[i], chunk->count[i]);
287
 
            if (adios_nssi_filter_is_anon_dim(chunk->fd, chunk->count_name[i]) == TRUE) {
288
 
                adios_nssi_filter_set_anon_dim(chunk->fd, chunk->count_name[i], chunk->count[i]);
289
 
            } else {
290
 
                if (DEBUG>2) printf("server_rank(%d) writing aggregated dim vname(%s)\n", global_rank, chunk->count_name[i]);
291
 
                Func_Timer("adios_set_path_var", adios_set_path_var(chunk->fd, chunk->count_path[i], chunk->count_name[i]););
292
 
                Func_Timer("adios_write", adios_write(chunk->fd, chunk->count_name[i], &(chunk->count[i])););
293
 
            }
294
 
        }
295
 
 
296
 
        if (DEBUG>3) printf("writing myrank(%d) vname(%s)\n", grank, chunk->var_name);
297
 
        if (DEBUG>2) printf("server_rank(%d) writing aggregated array vname(%s)\n", global_rank, chunk->var_name);
298
 
        Func_Timer("adios_set_path_var", adios_set_path_var(chunk->fd, chunk->var_path, chunk->var_name););
299
 
        Func_Timer("adios_write", adios_write(chunk->fd, chunk->var_name, chunk->buf););
300
 
 
301
 
//        cleanup_aggregation_chunks(fd, chunk->var_name);
302
 
    }
303
 
 
304
 
    cleanup_aggregation_chunks(fd);
305
 
 
306
 
    if (DEBUG>2) printf("myrank(%d): exit agg_and_write(fd=%ld)\n", grank, fd);
307
 
 
308
 
    return;
309
 
}
310
 
 
311
 
/* -------------------- SERVER-SIDE STUBS ---------- */
312
 
 
313
 
 
314
 
/**
315
 
 * @brief Open a netcdf dataset.
316
 
 *
317
 
 * Open an ADIOS dataset.
318
 
 */
319
 
int nssi_staging_open_stub(
320
 
        const unsigned long request_id,
321
 
        const nssi_remote_pid *caller,
322
 
        const adios_open_args *args,
323
 
        const nssi_rma *data_addr,
324
 
        const nssi_rma *res_addr)
325
 
{
326
 
    int rc = 0;
327
 
    char omode[2];
328
 
    adios_open_res res;  /* this is what we send back to the client */
329
 
    MPI_Info mpiHints = MPI_INFO_NULL;
330
 
 
331
 
    int64_t fd;
332
 
 
333
 
    memset(&res, 0, sizeof(res));
334
 
 
335
 
    if (DEBUG>2) printf("myrank(%d): enter nssi_staging_open_stub(%s, %d)\n", grank, args->fname, args->mode);
336
 
 
337
 
//    SetHints(&mpiHints, "");
338
 
//    ShowHints(&mpiHints);
339
 
 
340
 
    fd = open_file_get(args->fname);
341
 
    if (DEBUG>3) printf("myrank(%d): nssi_staging_open_stub(%s, %d) open_file_get()==%d\n", grank, args->fname, args->mode, fd);
342
 
    if (fd == -1) {
343
 
        omode[0]='\0';
344
 
        omode[1]='\0';
345
 
        switch(args->mode) {
346
 
        case ADIOS_MODE_READ:
347
 
            omode[0]='r';
348
 
            break;
349
 
        case ADIOS_MODE_WRITE:
350
 
            omode[0]='w';
351
 
            break;
352
 
        case ADIOS_MODE_APPEND:
353
 
            omode[0]='a';
354
 
            break;
355
 
        case ADIOS_MODE_UPDATE:
356
 
            omode[0]='u';
357
 
            break;
358
 
        default:
359
 
            break;
360
 
        }
361
 
 
362
 
        if (DEBUG>3) printf("start adios_open\n");
363
 
        if (args->use_single_server==TRUE) {
364
 
            if (DEBUG>3) printf("adios_open: using MPI_COMM_SELF\n");
365
 
            Func_Timer("adios_open", rc = adios_open(&fd, args->gname, args->fname, omode, &comm_self););
366
 
        } else {
367
 
            if (DEBUG>3) printf("adios_open: using MPI_COMM_WORLD\n");
368
 
            Func_Timer("adios_open", rc = adios_open(&fd, args->gname, args->fname, omode, &comm_world););
369
 
        }
370
 
        if (rc != 0) {
371
 
            printf("Error opening file \"%s\": %d\n", args->fname, rc);
372
 
            goto cleanup;
373
 
        }
374
 
        if (DEBUG>3) printf("end adios_open\n");
375
 
 
376
 
        open_file_add(args->fname, fd);
377
 
 
378
 
        add_file(fd, WRITE_CACHING_COLLECTIVE);
379
 
    }
380
 
 
381
 
    if (DEBUG>2) printf("myrank(%d): exit nssi_staging_open_stub(%s, %d): fd=%ld, fd=%p\n", grank, args->fname, args->mode, fd, fd);
382
 
 
383
 
    res.fd=fd;
384
 
 
385
 
cleanup:
386
 
    /* send the ncid and return code back to client */
387
 
    rc = nssi_send_result(caller, request_id, rc, &res, res_addr);
388
 
 
389
 
    return rc;
390
 
}
391
 
 
392
 
int nssi_staging_group_size_stub(
393
 
        const unsigned long request_id,
394
 
        const nssi_remote_pid *caller,
395
 
        const adios_group_size_args *args,
396
 
        const nssi_rma *data_addr,
397
 
        const nssi_rma *res_addr)
398
 
{
399
 
    int rc = 0;
400
 
    uint64_t total_size=0;
401
 
 
402
 
    if (DEBUG>2) printf("myrank(%d): enter nssi_staging_group_size_stub(fd=%ld, pg_size=%ld)\n", grank, args->fd, args->data_size);
403
 
 
404
 
    Func_Timer("adios_group_size", rc = adios_group_size(args->fd, args->data_size, &total_size););
405
 
    if (rc != 0) {
406
 
        printf("adios_group_size failed: %d\n", rc);
407
 
    }
408
 
 
409
 
    /* send result to client */
410
 
    rc = nssi_send_result(caller, request_id, rc, NULL, res_addr);
411
 
 
412
 
    if (DEBUG>2) printf("myrank(%d): exit nssi_staging_group_size_stub(%ld)\n", grank, args->fd);
413
 
 
414
 
    return rc;
415
 
}
416
 
 
417
 
int nssi_staging_close_stub(
418
 
        const unsigned long request_id,
419
 
        const nssi_remote_pid *caller,
420
 
        const adios_close_args *args,
421
 
        const nssi_rma *data_addr,
422
 
        const nssi_rma *res_addr)
423
 
{
424
 
    int rc = 0;
425
 
 
426
 
    if (DEBUG>2) printf("myrank(%d): enter nssi_staging_close_stub(%ld, %s)\n", grank, args->fd, args->fname);
427
 
 
428
 
    agg_and_write(args->fd);
429
 
 
430
 
    Func_Timer("adios_close", adios_close(args->fd););
431
 
 
432
 
    open_file_del(args->fname);
433
 
 
434
 
    /* send result to client */
435
 
    rc = nssi_send_result(caller, request_id, rc, NULL, res_addr);
436
 
 
437
 
    if (DEBUG>2) printf("myrank(%d): exit nssi_staging_close_stub(%ld, %s)\n", grank, args->fd, args->fname);
438
 
 
439
 
    return rc;
440
 
}
441
 
 
442
 
int nssi_staging_read_stub(
443
 
        const unsigned long request_id,
444
 
        const nssi_remote_pid *caller,
445
 
        const adios_read_args *args,
446
 
        const nssi_rma *data_addr,
447
 
        const nssi_rma *res_addr)
448
 
{
449
 
    int rc = 0;
450
 
    adios_read_res res;
451
 
    char vpathname[ADIOS_PATH_MAX];
452
 
    int pathlen;
453
 
    char *v=NULL;
454
 
 
455
 
    if (DEBUG>2) printf("myrank(%d): enter nssi_staging_read_stub(%ld)\n", grank, args->fd);
456
 
 
457
 
//    pathlen=strlen(args->vpath);
458
 
//    if (args->vpath[pathlen-1]=='/') {
459
 
//        sprintf(vpathname, "%s%s", args->vpath, args->vname);
460
 
//    } else {
461
 
//        sprintf(vpathname, "%s/%s", args->vpath, args->vname);
462
 
//    }
463
 
 
464
 
 
465
 
    for (int i=0;i<args->offsets.offsets_len;i++) {
466
 
        uint64_t *odata=(uint64_t *)malloc(sizeof(uint8_t));
467
 
        *odata = args->offsets.offsets_val[i].vdata;
468
 
        Func_Timer("adios_set_path_var", adios_set_path_var(args->fd, args->offsets.offsets_val[i].vpath, args->offsets.offsets_val[i].vname););
469
 
        Func_Timer("adios_read", adios_read(args->fd, args->offsets.offsets_val[i].vname, odata, 8););
470
 
    }
471
 
    for (int i=0;i<args->ldims.ldims_len;i++) {
472
 
        uint64_t *ddata=(uint64_t *)malloc(sizeof(uint8_t));
473
 
        *ddata = args->ldims.ldims_val[i].vdata;
474
 
        Func_Timer("adios_set_path_var", adios_set_path_var(args->fd, args->ldims.ldims_val[i].vpath, args->ldims.ldims_val[i].vname););
475
 
        Func_Timer("adios_read", adios_read(args->fd, args->ldims.ldims_val[i].vname, ddata, 8););
476
 
    }
477
 
 
478
 
    v=(char *)calloc(args->max_read, 1);
479
 
 
480
 
    Func_Timer("adios_set_path_var", adios_set_path_var(args->fd, args->vpath, args->vname););
481
 
    Func_Timer("adios_read", adios_read(args->fd, args->vname, v, args->max_read););
482
 
 
483
 
    res.bytes_read=args->max_read;
484
 
 
485
 
    Func_Timer("nssi_put_data", rc = nssi_put_data(caller, v, res.bytes_read, data_addr, -1););
486
 
    if (rc != NSSI_OK) {
487
 
        printf("Could not put var data on client\n");
488
 
        goto cleanup;
489
 
    }
490
 
 
491
 
cleanup:
492
 
    /* send result to client */
493
 
    rc = nssi_send_result(caller, request_id, rc, &res, res_addr);
494
 
 
495
 
    if (DEBUG>2) printf("myrank(%d): exit nssi_staging_read_stub(%ld)\n", grank, args->fd);
496
 
 
497
 
    return rc;
498
 
}
499
 
 
500
 
int nssi_staging_write_stub(
501
 
        const unsigned long request_id,
502
 
        const nssi_remote_pid *caller,
503
 
        const adios_write_args *args,
504
 
        const nssi_rma *data_addr,
505
 
        const nssi_rma *res_addr)
506
 
{
507
 
    int rc = 0;
508
 
    adios_write_res res;
509
 
    char vpathname[ADIOS_PATH_MAX];
510
 
    int pathlen;
511
 
    char *v=NULL;
512
 
    int i=0;
513
 
 
514
 
    if (DEBUG>2) printf("myrank(%d): enter nssi_staging_write_stub(fd=%ld, vsize=%ld)\n", grank, args->fd, args->vsize);
515
 
 
516
 
//    pathlen=strlen(args->vpath);
517
 
//    if (args->vpath[pathlen-1]=='/') {
518
 
//        sprintf(vpathname, "%s%s", args->vpath, args->vname);
519
 
//    } else {
520
 
//        sprintf(vpathname, "%s/%s", args->vpath, args->vname);
521
 
//    }
522
 
 
523
 
    v=(char *)malloc(args->vsize);
524
 
 
525
 
    Func_Timer("nssi_get_data", rc = nssi_get_data(caller, v, args->vsize, data_addr););
526
 
    if (rc != NSSI_OK) {
527
 
        printf("Could not get var data on client\n");
528
 
        goto cleanup;
529
 
    }
530
 
 
531
 
    if (DEBUG>3) printf("server_rank(%d) vname(%s) vsize(%ld) is_scalar(%d) writer_rank(%ld)\n", global_rank, args->vname, args->vsize, args->is_scalar, args->writer_rank);
532
 
 
533
 
    if (!args->is_scalar) {
534
 
        if (DEBUG>2) printf("server_rank(%d) caching non-scalar vname(%s) vsize(%ld)\n", global_rank, args->vname, args->vsize);
535
 
        if (DEBUG>3) printf("allocated v(%p), len(%ld)\n", v, args->vsize);
536
 
        aggregation_chunk_details_t *chunk=NULL;
537
 
        chunk = new aggregation_chunk_details_t;
538
 
        chunk->fd = args->fd;
539
 
        strcpy(chunk->var_path, args->vpath);
540
 
        strcpy(chunk->var_name, args->vname);
541
 
        chunk->ndims = args->offsets.offsets_len;
542
 
        chunk->buf = v;
543
 
        chunk->atype = (enum ADIOS_DATATYPES)args->atype;
544
 
        chunk->len   = args->vsize;
545
 
        chunk->num_elements = 1;
546
 
        for (int i=0;i<args->ldims.ldims_len;i++) {
547
 
            chunk->num_elements *= args->ldims.ldims_val[i].vdata;
548
 
        }
549
 
        chunk->offset_path = (char **)calloc(args->offsets.offsets_len, sizeof(char *));
550
 
        chunk->offset_name = (char **)calloc(args->offsets.offsets_len, sizeof(char *));
551
 
        chunk->offset = (uint64_t *)calloc(args->offsets.offsets_len, sizeof(uint64_t));
552
 
        for (int i=0;i<args->offsets.offsets_len;i++) {
553
 
            chunk->offset_path[i] = strdup(args->offsets.offsets_val[i].vpath);
554
 
            chunk->offset_name[i] = strdup(args->offsets.offsets_val[i].vname);
555
 
            chunk->offset[i] = args->offsets.offsets_val[i].vdata;
556
 
        }
557
 
        chunk->count_path = (char **)calloc(args->ldims.ldims_len, sizeof(char *));
558
 
        chunk->count_name = (char **)calloc(args->ldims.ldims_len, sizeof(char *));
559
 
        chunk->count  = (uint64_t *)calloc(args->ldims.ldims_len, sizeof(uint64_t));;
560
 
        for (int i=0;i<args->ldims.ldims_len;i++) {
561
 
            chunk->count_path[i] = strdup(args->ldims.ldims_val[i].vpath);
562
 
            chunk->count_name[i] = strdup(args->ldims.ldims_val[i].vname);
563
 
            chunk->count[i] = args->ldims.ldims_val[i].vdata;
564
 
        }
565
 
        add_chunk(chunk);
566
 
 
567
 
    } else {
568
 
        if (DEBUG>2) printf("server_rank(%d) writing scalar vname(%s) vsize(%ld)\n", global_rank, args->vname, args->vsize);
569
 
        Func_Timer("adios_set_path_var", adios_set_path_var(args->fd, args->vpath, args->vname););
570
 
        Func_Timer("adios_write", adios_write(args->fd, args->vname, v););
571
 
 
572
 
        free(v);
573
 
    }
574
 
 
575
 
    res.bytes_written=args->vsize;
576
 
 
577
 
cleanup:
578
 
 
579
 
    /* send result to client */
580
 
    rc = nssi_send_result(caller, request_id, rc, &res, res_addr);
581
 
 
582
 
    if (DEBUG>2) printf("myrank(%d): exit nssi_staging_write_stub(fd=%ld, vsize=%ld)\n", grank, args->fd, args->vsize);
583
 
 
584
 
    return rc;
585
 
}
586
 
 
587
 
int nssi_staging_start_calc_stub(
588
 
        const unsigned long request_id,
589
 
        const nssi_remote_pid *caller,
590
 
        const adios_start_calc_args *args,
591
 
        const nssi_rma *data_addr,
592
 
        const nssi_rma *res_addr)
593
 
{
594
 
    int rc = 0;
595
 
 
596
 
    if (DEBUG>2) printf("myrank(%d): enter nssi_staging_start_calc_stub(%ld)\n", grank, args->fd);
597
 
 
598
 
    agg_and_write(args->fd);
599
 
 
600
 
    /* send result to client */
601
 
    rc = nssi_send_result(caller, request_id, rc, NULL, res_addr);
602
 
 
603
 
    if (DEBUG>2) printf("myrank(%d): exit nssi_staging_start_calc_stub(%ld)\n", grank, args->fd);
604
 
 
605
 
    return rc;
606
 
}
607
 
 
608
 
int nssi_staging_stop_calc_stub(
609
 
        const unsigned long request_id,
610
 
        const nssi_remote_pid *caller,
611
 
        const adios_stop_calc_args *args,
612
 
        const nssi_rma *data_addr,
613
 
        const nssi_rma *res_addr)
614
 
{
615
 
    int rc = 0;
616
 
 
617
 
    if (DEBUG>2) printf("myrank(%d): enter nssi_staging_stop_calc_stub(%ld)\n", grank, args->fd);
618
 
 
619
 
 
620
 
    /* send result to client */
621
 
    rc = nssi_send_result(caller, request_id, rc, NULL, res_addr);
622
 
 
623
 
    if (DEBUG>2) printf("myrank(%d): exit nssi_staging_stop_calc_stub(%ld)\n", grank, args->fd);
624
 
 
625
 
    return rc;
626
 
}
627
 
 
628
 
int nssi_staging_end_iter_stub(
629
 
        const unsigned long request_id,
630
 
        const nssi_remote_pid *caller,
631
 
        const adios_end_iter_args *args,
632
 
        const nssi_rma *data_addr,
633
 
        const nssi_rma *res_addr)
634
 
{
635
 
    int rc = 0;
636
 
 
637
 
    if (DEBUG>2) printf("myrank(%d): enter nssi_staging_end_iter_stub(%ld)\n", grank, args->fd);
638
 
 
639
 
 
640
 
    /* send result to client */
641
 
    rc = nssi_send_result(caller, request_id, rc, NULL, res_addr);
642
 
 
643
 
    if (DEBUG>2) printf("myrank(%d): exit nssi_staging_end_iter_stub(%ld)\n", grank, args->fd);
644
 
 
645
 
    return rc;
646
 
}
647
 
 
648
 
int nssi_staging_finalize_stub(
649
 
        const unsigned long request_id,
650
 
        const nssi_remote_pid *caller,
651
 
        const adios_finalize_args *args,
652
 
        const nssi_rma *data_addr,
653
 
        const nssi_rma *res_addr)
654
 
{
655
 
    int rc = 0;
656
 
 
657
 
    if (DEBUG>2) printf("myrank(%d): enter nssi_staging_finalize_stub(%s)\n", grank, args->client_id);
658
 
 
659
 
    /*
660
 
     *
661
 
     * do nothing
662
 
     *
663
 
     */
664
 
 
665
 
    /* send result to client */
666
 
    rc = nssi_send_result(caller, request_id, rc, NULL, res_addr);
667
 
 
668
 
    if (DEBUG>2) printf("myrank(%d): exit nssi_staging_finalize_stub(%s)\n", grank, args->client_id);
669
 
 
670
 
    return rc;
671
 
}
672
 
 
673
 
/* -------- END SERVER-SIDE STUBS -------------- */
674
 
 
675
 
int nssi_staging_server_init(const char *adios_config_file)
676
 
{
677
 
    int rc=NSSI_OK;
678
 
 
679
 
    if (DEBUG>3) printf("start adios_init(%s)\n", adios_config_file);
680
 
    rc = adios_init(adios_config_file);
681
 
    if (rc != 1) {
682
 
        printf("adios_init() failed: %d\n", rc);
683
 
        return(-1);
684
 
    }
685
 
    if (DEBUG>3) printf("end adios_init(%s)\n", adios_config_file);
686
 
 
687
 
 
688
 
    /* register server stubs */
689
 
    NSSI_REGISTER_SERVER_STUB(ADIOS_OPEN_OP, nssi_staging_open_stub, adios_open_args, adios_open_res);
690
 
    NSSI_REGISTER_SERVER_STUB(ADIOS_GROUP_SIZE_OP, nssi_staging_group_size_stub, adios_group_size_args, void);
691
 
    NSSI_REGISTER_SERVER_STUB(ADIOS_READ_OP, nssi_staging_read_stub, adios_read_args, adios_read_res);
692
 
    NSSI_REGISTER_SERVER_STUB(ADIOS_WRITE_OP, nssi_staging_write_stub, adios_write_args, adios_write_res);
693
 
    NSSI_REGISTER_SERVER_STUB(ADIOS_END_ITER_OP, nssi_staging_end_iter_stub, adios_end_iter_args, void);
694
 
    NSSI_REGISTER_SERVER_STUB(ADIOS_START_CALC_OP, nssi_staging_start_calc_stub, adios_start_calc_args, void);
695
 
    NSSI_REGISTER_SERVER_STUB(ADIOS_STOP_CALC_OP, nssi_staging_stop_calc_stub, adios_stop_calc_args, void);
696
 
    NSSI_REGISTER_SERVER_STUB(ADIOS_CLOSE_OP, nssi_staging_close_stub, adios_close_args, void);
697
 
    NSSI_REGISTER_SERVER_STUB(ADIOS_FINALIZE_OP, nssi_staging_finalize_stub, adios_finalize_args, void);
698
 
 
699
 
    return 0;
700
 
}
701
 
 
702
 
 
703
 
 
704
 
static void generate_contact_info(nssi_remote_pid *myid)
705
 
{
706
 
    nssi_remote_pid *all_pids=NULL;
707
 
    int rank, np;
708
 
    char contact_path[1024];
709
 
 
710
 
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
711
 
    //printf("rank (%d)\n", rank);
712
 
 
713
 
    if (rank==0) {
714
 
        MPI_Comm_size(MPI_COMM_WORLD, &np);
715
 
        all_pids=(nssi_remote_pid *)malloc(np*sizeof(nssi_remote_pid));
716
 
    }
717
 
    MPI_Gather(myid, sizeof(nssi_remote_pid), MPI_BYTE,
718
 
               all_pids, sizeof(nssi_remote_pid), MPI_BYTE,
719
 
               0, MPI_COMM_WORLD);
720
 
    if (rank==0) {
721
 
        char *contact_file=getenv("ADIOS_NSSI_CONTACT_INFO");
722
 
        if (contact_file==NULL) {
723
 
            printf("ADIOS_NSSI_CONTACT_INFO env var is undefined.\n");
724
 
            free(all_pids);
725
 
            return;
726
 
        }
727
 
//        sprintf(contact_path, "%s.%04d", contact_file, rank);
728
 
        sprintf(contact_path, "%s", contact_file);
729
 
        if (DEBUG>3) printf("creating contact file (%s)\n", contact_path);
730
 
        FILE *f=fopen(contact_path, "w");
731
 
        if (f==NULL) {
732
 
            perror("fopen");
733
 
        }
734
 
        for (int i=0;i<np;i++) {
735
 
            fprintf(f, "%u@%u@%s@%u\n",
736
 
                    all_pids[i].nid, all_pids[i].pid,
737
 
                    all_pids[i].hostname, (unsigned int)ntohs(all_pids[i].port));
738
 
        }
739
 
//        fprintf(f, "%u@%u@%s@%u\n",
740
 
//                myid->nid, myid->pid,
741
 
//                myid->hostname, (unsigned int)ntohs(myid->port));
742
 
        fclose(f);
743
 
        free(all_pids);
744
 
    }
745
 
    MPI_Barrier(MPI_COMM_WORLD);
746
 
}
747
 
 
748
 
 
749
 
/**
750
 
 * @brief The LWFS xfer-server.
751
 
 */
752
 
int main(int argc, char **argv)
753
 
{
754
 
    int rc = NSSI_OK;
755
 
 
756
 
    nssi_service nssi_svc;
757
 
//    log_level debug_level;
758
 
    char logfile[1024];
759
 
    int rank, np;
760
 
 
761
 
    MPI_Init(&argc, &argv);
762
 
    MPI_Comm_rank(MPI_COMM_WORLD, &global_rank);
763
 
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
764
 
    MPI_Comm_size(MPI_COMM_WORLD, &np);
765
 
    grank=rank;
766
 
    gsize=np;
767
 
 
768
 
    /* options that can be overriden by the command-line */
769
 
    bool daemon_flag = false;
770
 
    int verbose = 5;  /* default debug_level */
771
 
    int num_threads = 0;
772
 
    int server_pid = 128;   /* process ID of the server */
773
 
    int server_port = 7728; /* TCP port of the server */
774
 
 
775
 
    memset(&nssi_svc, 0, sizeof(nssi_service));
776
 
 
777
 
    /* initialize and enable logging */
778
 
//    if (args_info.logfile_arg != NULL) {
779
 
//        sprintf(logfile, "%s.%04d", "nssi_staging_server.log", rank);
780
 
//        logger_init((log_level)verbose, logfile);
781
 
//    } else {
782
 
//        logger_init((log_level)verbose, NULL);
783
 
//    }
784
 
//    netcdf_debug_level=(log_level)(log_level)args_info.verbose_arg;
785
 
//    debug_level = (log_level)args_info.verbose_arg;
786
 
 
787
 
//    logger_init((log_level)verbose, NULL);
788
 
 
789
 
    if (daemon_flag) {
790
 
        nssi_daemonize();
791
 
    }
792
 
 
793
 
#ifdef HAVE_PORTALS
794
 
    nssi_ptl_init(PTL_IFACE_SERVER, server_pid);
795
 
    rc = nssi_rpc_init(NSSI_RPC_PTL, NSSI_RPC_XDR);
796
 
    if (rc != NSSI_OK) {
797
 
        printf("could not init rpc: %s\n",
798
 
                nssi_err_str(rc));
799
 
        return rc;
800
 
    }
801
 
    nssi_remote_pid myid;
802
 
    memset(&myid, 0, sizeof(nssi_remote_pid));
803
 
    nssi_ptl_get_id(&myid);
804
 
    generate_contact_info(&myid);
805
 
#endif
806
 
#ifdef HAVE_INFINIBAND
807
 
    memset(&nssi_svc.req_addr.match_id, 0, sizeof(nssi_remote_pid));
808
 
    strcpy(nssi_svc.req_addr.match_id.hostname, args_info.server_addr_arg);
809
 
    nssi_svc.req_addr.match_id.port = args_info.server_port_arg;
810
 
 
811
 
    nssi_ib_init(&nssi_svc);
812
 
    rc = nssi_rpc_init(NSSI_RPC_IB, NSSI_RPC_XDR);
813
 
    if (rc != NSSI_OK) {
814
 
        printf("could not init rpc: %s\n",
815
 
                nssi_err_str(rc));
816
 
        return rc;
817
 
    }
818
 
    generate_contact_info(&nssi_svc.req_addr.match_id);
819
 
#endif
820
 
 
821
 
    if (DEBUG>3) printf("Initialize staging service\n");
822
 
 
823
 
    /* initialize the lwfs service */
824
 
    rc = nssi_service_init(0, NSSI_SHORT_REQUEST_SIZE, &nssi_svc);
825
 
    if (rc != NSSI_OK) {
826
 
        printf("could not init nssi_svc: %s\n",
827
 
                nssi_err_str(rc));
828
 
        return -1;
829
 
    }
830
 
 
831
 
    /* initialize staging service */
832
 
    rc = nssi_staging_server_init(argv[1]);
833
 
 
834
 
    /* start processing requests */
835
 
    nssi_svc.max_reqs = -1;
836
 
    rc = nssi_service_start(&nssi_svc, num_threads);
837
 
    if (rc != NSSI_OK) {
838
 
        printf("exited nssi_svc: %s\n",
839
 
                nssi_err_str(rc));
840
 
    }
841
 
 
842
 
    adios_finalize(rank);
843
 
 
844
 
    /* shutdown the nssi_svc */
845
 
    if (DEBUG>3) printf("shutting down service library\n");
846
 
    nssi_service_fini(&nssi_svc);
847
 
 
848
 
    nssi_rpc_fini();
849
 
 
850
 
    MPI_Finalize();
851
 
 
852
 
    return rc;
853
 
}