3
* @brief Driver for the LWFS name server.
5
* @author Ron Oldfield (raoldfi\@sandia.gov).
7
* $Date: 2007-02-27 15:30:26 -0700 (Tue, 27 Feb 2007) $.
11
#ifndef __STDC_CONSTANT_MACROS
12
#define __STDC_CONSTANT_MACROS
26
#include "nssi_ptls.h"
28
#ifdef HAVE_INFINIBAND
31
#include "nssi_server.h"
32
#include "nssi_logger.h"
34
#include "adios_nssi_args.h"
35
#include "adios_nssi_config.h"
39
#include "aggregation.h"
49
#ifdef __LIBCATAMOUNT__
50
#define ntohs(value) 0
54
/* Need a struct to encapsulate open file info.
57
char ofname[ADIOS_PATH_MAX];
60
open_file(const char *name) {
64
open_file(const char *name, const int64_t desc) {
69
/* Need a comparison operator to pass into the open_file_map
73
bool operator()(const struct open_file &of1, const struct open_file &of2) const
75
// log_debug(rpc_debug_level, "cqp1.qp_num == %u", cqp1.qp_num);
76
// log_debug(rpc_debug_level, "cqp2.qp_num == %u", cqp2.qp_num);
78
if (strcmp(of1.ofname, of2.ofname) <0) return TRUE;
84
/* Map of open files */
85
static map<struct open_file, int64_t, open_file_lt> open_file_map;
86
typedef map<struct open_file, int64_t, open_file_lt>::iterator open_file_map_iterator_t;
87
typedef pair<struct open_file, int64_t> open_file_map_pair_t;
88
static pthread_mutex_t open_file_map_mutex=PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
89
static pthread_cond_t open_file_map_cond =PTHREAD_COND_INITIALIZER;
92
static int global_rank=-1;
96
/* -------------------- PRIVATE FUNCTIONS ---------- */
97
void open_file_add(char *fname, int64_t fd)
99
open_file_map[fname]=fd;
102
int64_t open_file_get(char *fname)
107
open_file_map_iterator_t iter=open_file_map.find(of);
108
if (iter != open_file_map.end()) {
114
void open_file_del(char *fname)
116
open_file_map_iterator_t iter;
119
open_file_map.erase(of);
126
MPI_Comm comm_self=MPI_COMM_SELF;
127
MPI_Comm comm_world=MPI_COMM_WORLD;
131
* The next 3 utility functions are lifted from IOR.
133
/******************************************************************************/
135
* Extract key/value pair from hint string.
139
ExtractHint(char * settingVal,
148
settingPtr = (char *)strtok(hintString, "=");
149
valuePtr = (char *)strtok(NULL, " \t\r\n");
150
tmpPtr1 = settingPtr;
151
tmpPtr2 = (char *)strstr(settingPtr, "MPIIO_HINT__");
152
if (tmpPtr1 == tmpPtr2) {
153
settingPtr += strlen("MPIIO_HINT__");
155
strcpy(settingVal, settingPtr);
156
strcpy(valueVal, valuePtr);
157
} /* ExtractHint() */
160
/******************************************************************************/
162
* Set hints for MPIIO, HDF5, or NCMPI.
164
#define MAX_HINT_STR 1024
166
SetHints(MPI_Info * mpiHints, char * hintsFileName)
168
char hintString[MAX_HINT_STR],
169
settingVal[MAX_HINT_STR],
170
valueVal[MAX_HINT_STR];
171
extern char ** environ;
176
* This routine checks for hints from the environment and/or from the
177
* hints files. The hints are of the form:
178
* 'MPIIO_HINT__<hint>=<value>', <hint> is the full name of the hint
179
* to be set, and <value> is the hint value.
180
* E.g., 'setenv MPIIO_HINT__panfs_concurrent_write 1'
181
* or 'MPIIO_HINT__panfs_concurrent_write=1' in the hints file.
183
MPI_Info_create(mpiHints);
185
/* get hints from environment */
186
for (i = 0; environ[i] != NULL; i++) {
187
/* if this is an IOR_HINT, pass the hint to the info object */
188
if (strncmp(environ[i], "MPIIO_HINT", strlen("MPIIO_HINT")) == 0) {
189
strcpy(hintString, environ[i]);
190
ExtractHint(settingVal, valueVal, hintString);
191
MPI_Info_set(*mpiHints, settingVal, valueVal);
195
/* get hints from hints file */
196
if (strcmp(hintsFileName, "") != 0) {
198
/* open the hint file */
199
fd = fopen(hintsFileName, "r");
201
printf("cannot open hints file\n");
203
/* iterate over hints file */
204
while(fgets(hintString, MAX_HINT_STR, fd) != NULL) {
205
if (strncmp(hintString, "MPIIO_HINT", strlen("MPIIO_HINT")) == 0) {
206
ExtractHint(settingVal, valueVal, hintString);
207
MPI_Info_set(*mpiHints, settingVal, valueVal);
210
/* close the hints files */
211
if (fclose(fd) != 0) printf("cannot close hints file\n");
217
/******************************************************************************/
219
* Show all hints (key/value pairs) in an MPI_Info object.
222
void ShowHints(MPI_Info * mpiHints)
224
char key[MPI_MAX_INFO_VAL],
225
value[MPI_MAX_INFO_VAL];
230
MPI_Info_get_nkeys(*mpiHints, &nkeys);
232
for (i = 0; i < nkeys; i++) {
233
MPI_Info_get_nthkey(*mpiHints, i, key);
234
MPI_Info_get(*mpiHints, key, MPI_MAX_INFO_VAL-1, value, &flag);
235
printf("mpiHint[%d]: %s = %s\n", i, key, value);
246
extern int adios_nssi_filter_is_anon_dim(
248
const char *dimname);
249
extern void adios_nssi_filter_set_anon_dim(
252
const uint64_t dimvalue);
258
void agg_and_write(const int64_t fd)
260
if (DEBUG>2) printf("myrank(%d): enter agg_and_write(fd=%ld)\n", grank, fd);
262
Func_Timer("try_aggregation", try_aggregation(fd);); // aggregate all varids in this file
265
aggregation_chunk_details_t **chunks = get_chunks(fd, &chunk_count);
267
for (int j=0;j<chunk_count;j++) {
268
aggregation_chunk_details_t *chunk = chunks[j];
270
// write all offsets for clients rank to update adios internals
271
for(int i=0;i<chunk->ndims;i++) {
273
if (DEBUG>3) printf("writing myrank(%d) chunk(%d) vpath(%s) vname(%s) opath(%s) oname(%s) odata(%lu)\n",
274
grank, j, chunk->var_path, chunk->var_name, chunk->offset_path[i], chunk->offset_name[i], chunk->offset[i]);
275
if (adios_nssi_filter_is_anon_dim(chunk->fd, chunk->offset_name[i]) == TRUE) {
276
adios_nssi_filter_set_anon_dim(chunk->fd, chunk->offset_name[i], chunk->offset[i]);
278
if (DEBUG>2) printf("server_rank(%d) writing aggregated offset vname(%s)\n", global_rank, chunk->offset_name[i]);
279
Func_Timer("adios_set_path_var", adios_set_path_var(chunk->fd, chunk->offset_path[i], chunk->offset_name[i]););
280
Func_Timer("adios_write", adios_write(chunk->fd, chunk->offset_name[i], &(chunk->offset[i])););
283
for(int i=0;i<chunk->ndims;i++) {
285
if (DEBUG>3) printf("writing myrank(%d) chunk(%d) vpath(%s) vname(%s) dpath(%s) dname(%s) ddata(%lu)\n",
286
grank, j, chunk->var_path, chunk->var_name, chunk->count_path[i], chunk->count_name[i], chunk->count[i]);
287
if (adios_nssi_filter_is_anon_dim(chunk->fd, chunk->count_name[i]) == TRUE) {
288
adios_nssi_filter_set_anon_dim(chunk->fd, chunk->count_name[i], chunk->count[i]);
290
if (DEBUG>2) printf("server_rank(%d) writing aggregated dim vname(%s)\n", global_rank, chunk->count_name[i]);
291
Func_Timer("adios_set_path_var", adios_set_path_var(chunk->fd, chunk->count_path[i], chunk->count_name[i]););
292
Func_Timer("adios_write", adios_write(chunk->fd, chunk->count_name[i], &(chunk->count[i])););
296
if (DEBUG>3) printf("writing myrank(%d) vname(%s)\n", grank, chunk->var_name);
297
if (DEBUG>2) printf("server_rank(%d) writing aggregated array vname(%s)\n", global_rank, chunk->var_name);
298
Func_Timer("adios_set_path_var", adios_set_path_var(chunk->fd, chunk->var_path, chunk->var_name););
299
Func_Timer("adios_write", adios_write(chunk->fd, chunk->var_name, chunk->buf););
301
// cleanup_aggregation_chunks(fd, chunk->var_name);
304
cleanup_aggregation_chunks(fd);
306
if (DEBUG>2) printf("myrank(%d): exit agg_and_write(fd=%ld)\n", grank, fd);
311
/* -------------------- SERVER-SIDE STUBS ---------- */
315
* @brief Open a netcdf dataset.
317
* Open an ADIOS dataset.
319
int nssi_staging_open_stub(
320
const unsigned long request_id,
321
const nssi_remote_pid *caller,
322
const adios_open_args *args,
323
const nssi_rma *data_addr,
324
const nssi_rma *res_addr)
328
adios_open_res res; /* this is what we send back to the client */
329
MPI_Info mpiHints = MPI_INFO_NULL;
333
memset(&res, 0, sizeof(res));
335
if (DEBUG>2) printf("myrank(%d): enter nssi_staging_open_stub(%s, %d)\n", grank, args->fname, args->mode);
337
// SetHints(&mpiHints, "");
338
// ShowHints(&mpiHints);
340
fd = open_file_get(args->fname);
341
if (DEBUG>3) printf("myrank(%d): nssi_staging_open_stub(%s, %d) open_file_get()==%d\n", grank, args->fname, args->mode, fd);
346
case ADIOS_MODE_READ:
349
case ADIOS_MODE_WRITE:
352
case ADIOS_MODE_APPEND:
355
case ADIOS_MODE_UPDATE:
362
if (DEBUG>3) printf("start adios_open\n");
363
if (args->use_single_server==TRUE) {
364
if (DEBUG>3) printf("adios_open: using MPI_COMM_SELF\n");
365
Func_Timer("adios_open", rc = adios_open(&fd, args->gname, args->fname, omode, &comm_self););
367
if (DEBUG>3) printf("adios_open: using MPI_COMM_WORLD\n");
368
Func_Timer("adios_open", rc = adios_open(&fd, args->gname, args->fname, omode, &comm_world););
371
printf("Error opening file \"%s\": %d\n", args->fname, rc);
374
if (DEBUG>3) printf("end adios_open\n");
376
open_file_add(args->fname, fd);
378
add_file(fd, WRITE_CACHING_COLLECTIVE);
381
if (DEBUG>2) printf("myrank(%d): exit nssi_staging_open_stub(%s, %d): fd=%ld, fd=%p\n", grank, args->fname, args->mode, fd, fd);
386
/* send the ncid and return code back to client */
387
rc = nssi_send_result(caller, request_id, rc, &res, res_addr);
392
int nssi_staging_group_size_stub(
393
const unsigned long request_id,
394
const nssi_remote_pid *caller,
395
const adios_group_size_args *args,
396
const nssi_rma *data_addr,
397
const nssi_rma *res_addr)
400
uint64_t total_size=0;
402
if (DEBUG>2) printf("myrank(%d): enter nssi_staging_group_size_stub(fd=%ld, pg_size=%ld)\n", grank, args->fd, args->data_size);
404
Func_Timer("adios_group_size", rc = adios_group_size(args->fd, args->data_size, &total_size););
406
printf("adios_group_size failed: %d\n", rc);
409
/* send result to client */
410
rc = nssi_send_result(caller, request_id, rc, NULL, res_addr);
412
if (DEBUG>2) printf("myrank(%d): exit nssi_staging_group_size_stub(%ld)\n", grank, args->fd);
417
int nssi_staging_close_stub(
418
const unsigned long request_id,
419
const nssi_remote_pid *caller,
420
const adios_close_args *args,
421
const nssi_rma *data_addr,
422
const nssi_rma *res_addr)
426
if (DEBUG>2) printf("myrank(%d): enter nssi_staging_close_stub(%ld, %s)\n", grank, args->fd, args->fname);
428
agg_and_write(args->fd);
430
Func_Timer("adios_close", adios_close(args->fd););
432
open_file_del(args->fname);
434
/* send result to client */
435
rc = nssi_send_result(caller, request_id, rc, NULL, res_addr);
437
if (DEBUG>2) printf("myrank(%d): exit nssi_staging_close_stub(%ld, %s)\n", grank, args->fd, args->fname);
442
int nssi_staging_read_stub(
443
const unsigned long request_id,
444
const nssi_remote_pid *caller,
445
const adios_read_args *args,
446
const nssi_rma *data_addr,
447
const nssi_rma *res_addr)
451
char vpathname[ADIOS_PATH_MAX];
455
if (DEBUG>2) printf("myrank(%d): enter nssi_staging_read_stub(%ld)\n", grank, args->fd);
457
// pathlen=strlen(args->vpath);
458
// if (args->vpath[pathlen-1]=='/') {
459
// sprintf(vpathname, "%s%s", args->vpath, args->vname);
461
// sprintf(vpathname, "%s/%s", args->vpath, args->vname);
465
for (int i=0;i<args->offsets.offsets_len;i++) {
466
uint64_t *odata=(uint64_t *)malloc(sizeof(uint8_t));
467
*odata = args->offsets.offsets_val[i].vdata;
468
Func_Timer("adios_set_path_var", adios_set_path_var(args->fd, args->offsets.offsets_val[i].vpath, args->offsets.offsets_val[i].vname););
469
Func_Timer("adios_read", adios_read(args->fd, args->offsets.offsets_val[i].vname, odata, 8););
471
for (int i=0;i<args->ldims.ldims_len;i++) {
472
uint64_t *ddata=(uint64_t *)malloc(sizeof(uint8_t));
473
*ddata = args->ldims.ldims_val[i].vdata;
474
Func_Timer("adios_set_path_var", adios_set_path_var(args->fd, args->ldims.ldims_val[i].vpath, args->ldims.ldims_val[i].vname););
475
Func_Timer("adios_read", adios_read(args->fd, args->ldims.ldims_val[i].vname, ddata, 8););
478
v=(char *)calloc(args->max_read, 1);
480
Func_Timer("adios_set_path_var", adios_set_path_var(args->fd, args->vpath, args->vname););
481
Func_Timer("adios_read", adios_read(args->fd, args->vname, v, args->max_read););
483
res.bytes_read=args->max_read;
485
Func_Timer("nssi_put_data", rc = nssi_put_data(caller, v, res.bytes_read, data_addr, -1););
487
printf("Could not put var data on client\n");
492
/* send result to client */
493
rc = nssi_send_result(caller, request_id, rc, &res, res_addr);
495
if (DEBUG>2) printf("myrank(%d): exit nssi_staging_read_stub(%ld)\n", grank, args->fd);
500
int nssi_staging_write_stub(
501
const unsigned long request_id,
502
const nssi_remote_pid *caller,
503
const adios_write_args *args,
504
const nssi_rma *data_addr,
505
const nssi_rma *res_addr)
509
char vpathname[ADIOS_PATH_MAX];
514
if (DEBUG>2) printf("myrank(%d): enter nssi_staging_write_stub(fd=%ld, vsize=%ld)\n", grank, args->fd, args->vsize);
516
// pathlen=strlen(args->vpath);
517
// if (args->vpath[pathlen-1]=='/') {
518
// sprintf(vpathname, "%s%s", args->vpath, args->vname);
520
// sprintf(vpathname, "%s/%s", args->vpath, args->vname);
523
v=(char *)malloc(args->vsize);
525
Func_Timer("nssi_get_data", rc = nssi_get_data(caller, v, args->vsize, data_addr););
527
printf("Could not get var data on client\n");
531
if (DEBUG>3) printf("server_rank(%d) vname(%s) vsize(%ld) is_scalar(%d) writer_rank(%ld)\n", global_rank, args->vname, args->vsize, args->is_scalar, args->writer_rank);
533
if (!args->is_scalar) {
534
if (DEBUG>2) printf("server_rank(%d) caching non-scalar vname(%s) vsize(%ld)\n", global_rank, args->vname, args->vsize);
535
if (DEBUG>3) printf("allocated v(%p), len(%ld)\n", v, args->vsize);
536
aggregation_chunk_details_t *chunk=NULL;
537
chunk = new aggregation_chunk_details_t;
538
chunk->fd = args->fd;
539
strcpy(chunk->var_path, args->vpath);
540
strcpy(chunk->var_name, args->vname);
541
chunk->ndims = args->offsets.offsets_len;
543
chunk->atype = (enum ADIOS_DATATYPES)args->atype;
544
chunk->len = args->vsize;
545
chunk->num_elements = 1;
546
for (int i=0;i<args->ldims.ldims_len;i++) {
547
chunk->num_elements *= args->ldims.ldims_val[i].vdata;
549
chunk->offset_path = (char **)calloc(args->offsets.offsets_len, sizeof(char *));
550
chunk->offset_name = (char **)calloc(args->offsets.offsets_len, sizeof(char *));
551
chunk->offset = (uint64_t *)calloc(args->offsets.offsets_len, sizeof(uint64_t));
552
for (int i=0;i<args->offsets.offsets_len;i++) {
553
chunk->offset_path[i] = strdup(args->offsets.offsets_val[i].vpath);
554
chunk->offset_name[i] = strdup(args->offsets.offsets_val[i].vname);
555
chunk->offset[i] = args->offsets.offsets_val[i].vdata;
557
chunk->count_path = (char **)calloc(args->ldims.ldims_len, sizeof(char *));
558
chunk->count_name = (char **)calloc(args->ldims.ldims_len, sizeof(char *));
559
chunk->count = (uint64_t *)calloc(args->ldims.ldims_len, sizeof(uint64_t));;
560
for (int i=0;i<args->ldims.ldims_len;i++) {
561
chunk->count_path[i] = strdup(args->ldims.ldims_val[i].vpath);
562
chunk->count_name[i] = strdup(args->ldims.ldims_val[i].vname);
563
chunk->count[i] = args->ldims.ldims_val[i].vdata;
568
if (DEBUG>2) printf("server_rank(%d) writing scalar vname(%s) vsize(%ld)\n", global_rank, args->vname, args->vsize);
569
Func_Timer("adios_set_path_var", adios_set_path_var(args->fd, args->vpath, args->vname););
570
Func_Timer("adios_write", adios_write(args->fd, args->vname, v););
575
res.bytes_written=args->vsize;
579
/* send result to client */
580
rc = nssi_send_result(caller, request_id, rc, &res, res_addr);
582
if (DEBUG>2) printf("myrank(%d): exit nssi_staging_write_stub(fd=%ld, vsize=%ld)\n", grank, args->fd, args->vsize);
587
int nssi_staging_start_calc_stub(
588
const unsigned long request_id,
589
const nssi_remote_pid *caller,
590
const adios_start_calc_args *args,
591
const nssi_rma *data_addr,
592
const nssi_rma *res_addr)
596
if (DEBUG>2) printf("myrank(%d): enter nssi_staging_start_calc_stub(%ld)\n", grank, args->fd);
598
agg_and_write(args->fd);
600
/* send result to client */
601
rc = nssi_send_result(caller, request_id, rc, NULL, res_addr);
603
if (DEBUG>2) printf("myrank(%d): exit nssi_staging_start_calc_stub(%ld)\n", grank, args->fd);
608
int nssi_staging_stop_calc_stub(
609
const unsigned long request_id,
610
const nssi_remote_pid *caller,
611
const adios_stop_calc_args *args,
612
const nssi_rma *data_addr,
613
const nssi_rma *res_addr)
617
if (DEBUG>2) printf("myrank(%d): enter nssi_staging_stop_calc_stub(%ld)\n", grank, args->fd);
620
/* send result to client */
621
rc = nssi_send_result(caller, request_id, rc, NULL, res_addr);
623
if (DEBUG>2) printf("myrank(%d): exit nssi_staging_stop_calc_stub(%ld)\n", grank, args->fd);
628
int nssi_staging_end_iter_stub(
629
const unsigned long request_id,
630
const nssi_remote_pid *caller,
631
const adios_end_iter_args *args,
632
const nssi_rma *data_addr,
633
const nssi_rma *res_addr)
637
if (DEBUG>2) printf("myrank(%d): enter nssi_staging_end_iter_stub(%ld)\n", grank, args->fd);
640
/* send result to client */
641
rc = nssi_send_result(caller, request_id, rc, NULL, res_addr);
643
if (DEBUG>2) printf("myrank(%d): exit nssi_staging_end_iter_stub(%ld)\n", grank, args->fd);
648
int nssi_staging_finalize_stub(
649
const unsigned long request_id,
650
const nssi_remote_pid *caller,
651
const adios_finalize_args *args,
652
const nssi_rma *data_addr,
653
const nssi_rma *res_addr)
657
if (DEBUG>2) printf("myrank(%d): enter nssi_staging_finalize_stub(%s)\n", grank, args->client_id);
665
/* send result to client */
666
rc = nssi_send_result(caller, request_id, rc, NULL, res_addr);
668
if (DEBUG>2) printf("myrank(%d): exit nssi_staging_finalize_stub(%s)\n", grank, args->client_id);
673
/* -------- END SERVER-SIDE STUBS -------------- */
675
int nssi_staging_server_init(const char *adios_config_file)
679
if (DEBUG>3) printf("start adios_init(%s)\n", adios_config_file);
680
rc = adios_init(adios_config_file);
682
printf("adios_init() failed: %d\n", rc);
685
if (DEBUG>3) printf("end adios_init(%s)\n", adios_config_file);
688
/* register server stubs */
689
NSSI_REGISTER_SERVER_STUB(ADIOS_OPEN_OP, nssi_staging_open_stub, adios_open_args, adios_open_res);
690
NSSI_REGISTER_SERVER_STUB(ADIOS_GROUP_SIZE_OP, nssi_staging_group_size_stub, adios_group_size_args, void);
691
NSSI_REGISTER_SERVER_STUB(ADIOS_READ_OP, nssi_staging_read_stub, adios_read_args, adios_read_res);
692
NSSI_REGISTER_SERVER_STUB(ADIOS_WRITE_OP, nssi_staging_write_stub, adios_write_args, adios_write_res);
693
NSSI_REGISTER_SERVER_STUB(ADIOS_END_ITER_OP, nssi_staging_end_iter_stub, adios_end_iter_args, void);
694
NSSI_REGISTER_SERVER_STUB(ADIOS_START_CALC_OP, nssi_staging_start_calc_stub, adios_start_calc_args, void);
695
NSSI_REGISTER_SERVER_STUB(ADIOS_STOP_CALC_OP, nssi_staging_stop_calc_stub, adios_stop_calc_args, void);
696
NSSI_REGISTER_SERVER_STUB(ADIOS_CLOSE_OP, nssi_staging_close_stub, adios_close_args, void);
697
NSSI_REGISTER_SERVER_STUB(ADIOS_FINALIZE_OP, nssi_staging_finalize_stub, adios_finalize_args, void);
704
static void generate_contact_info(nssi_remote_pid *myid)
706
nssi_remote_pid *all_pids=NULL;
708
char contact_path[1024];
710
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
711
//printf("rank (%d)\n", rank);
714
MPI_Comm_size(MPI_COMM_WORLD, &np);
715
all_pids=(nssi_remote_pid *)malloc(np*sizeof(nssi_remote_pid));
717
MPI_Gather(myid, sizeof(nssi_remote_pid), MPI_BYTE,
718
all_pids, sizeof(nssi_remote_pid), MPI_BYTE,
721
char *contact_file=getenv("ADIOS_NSSI_CONTACT_INFO");
722
if (contact_file==NULL) {
723
printf("ADIOS_NSSI_CONTACT_INFO env var is undefined.\n");
727
// sprintf(contact_path, "%s.%04d", contact_file, rank);
728
sprintf(contact_path, "%s", contact_file);
729
if (DEBUG>3) printf("creating contact file (%s)\n", contact_path);
730
FILE *f=fopen(contact_path, "w");
734
for (int i=0;i<np;i++) {
735
fprintf(f, "%u@%u@%s@%u\n",
736
all_pids[i].nid, all_pids[i].pid,
737
all_pids[i].hostname, (unsigned int)ntohs(all_pids[i].port));
739
// fprintf(f, "%u@%u@%s@%u\n",
740
// myid->nid, myid->pid,
741
// myid->hostname, (unsigned int)ntohs(myid->port));
745
MPI_Barrier(MPI_COMM_WORLD);
750
* @brief The LWFS xfer-server.
752
int main(int argc, char **argv)
756
nssi_service nssi_svc;
757
// log_level debug_level;
761
MPI_Init(&argc, &argv);
762
MPI_Comm_rank(MPI_COMM_WORLD, &global_rank);
763
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
764
MPI_Comm_size(MPI_COMM_WORLD, &np);
768
/* options that can be overriden by the command-line */
769
bool daemon_flag = false;
770
int verbose = 5; /* default debug_level */
772
int server_pid = 128; /* process ID of the server */
773
int server_port = 7728; /* TCP port of the server */
775
memset(&nssi_svc, 0, sizeof(nssi_service));
777
/* initialize and enable logging */
778
// if (args_info.logfile_arg != NULL) {
779
// sprintf(logfile, "%s.%04d", "nssi_staging_server.log", rank);
780
// logger_init((log_level)verbose, logfile);
782
// logger_init((log_level)verbose, NULL);
784
// netcdf_debug_level=(log_level)(log_level)args_info.verbose_arg;
785
// debug_level = (log_level)args_info.verbose_arg;
787
// logger_init((log_level)verbose, NULL);
794
nssi_ptl_init(PTL_IFACE_SERVER, server_pid);
795
rc = nssi_rpc_init(NSSI_RPC_PTL, NSSI_RPC_XDR);
797
printf("could not init rpc: %s\n",
801
nssi_remote_pid myid;
802
memset(&myid, 0, sizeof(nssi_remote_pid));
803
nssi_ptl_get_id(&myid);
804
generate_contact_info(&myid);
806
#ifdef HAVE_INFINIBAND
807
memset(&nssi_svc.req_addr.match_id, 0, sizeof(nssi_remote_pid));
808
strcpy(nssi_svc.req_addr.match_id.hostname, args_info.server_addr_arg);
809
nssi_svc.req_addr.match_id.port = args_info.server_port_arg;
811
nssi_ib_init(&nssi_svc);
812
rc = nssi_rpc_init(NSSI_RPC_IB, NSSI_RPC_XDR);
814
printf("could not init rpc: %s\n",
818
generate_contact_info(&nssi_svc.req_addr.match_id);
821
if (DEBUG>3) printf("Initialize staging service\n");
823
/* initialize the lwfs service */
824
rc = nssi_service_init(0, NSSI_SHORT_REQUEST_SIZE, &nssi_svc);
826
printf("could not init nssi_svc: %s\n",
831
/* initialize staging service */
832
rc = nssi_staging_server_init(argv[1]);
834
/* start processing requests */
835
nssi_svc.max_reqs = -1;
836
rc = nssi_service_start(&nssi_svc, num_threads);
838
printf("exited nssi_svc: %s\n",
842
adios_finalize(rank);
844
/* shutdown the nssi_svc */
845
if (DEBUG>3) printf("shutting down service library\n");
846
nssi_service_fini(&nssi_svc);