~ubuntu-branches/ubuntu/vivid/mpich/vivid-proposed

« back to all changes in this revision

Viewing changes to src/mpid/pamid/src/dyntask/mpid_comm_spawn_multiple.c

  • Committer: Package Import Robot
  • Author(s): Anton Gladky
  • Date: 2014-04-01 20:24:20 UTC
  • mfrom: (5.2.4 sid)
  • Revision ID: package-import@ubuntu.com-20140401202420-t5ey1ia2klt5dkq3
Tags: 3.1-4
* [c3e3398] Disable test_primitives, which is unreliable on some platforms.
            (Closes: #743047)
* [265a699] Add minimal autotest.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* -*- Mode: C; c-basic-offset:4 ; -*- */
 
2
/*
 
3
 *  (C) 2001 by Argonne National Laboratory.
 
4
 *      See COPYRIGHT in top-level directory.
 
5
 */
 
6
 
 
7
#include "mpidimpl.h"
 
8
#ifdef USE_PMI2_API
 
9
#include "pmi2.h"
 
10
#else
 
11
#include "pmi.h"
 
12
#endif
 
13
 
 
14
#ifdef DYNAMIC_TASKING
 
15
 
 
16
extern mpidi_dynamic_tasking;
 
17
 
 
18
/* Define the name of the kvs key used to provide the port name to the
 
19
   children */
 
20
#define MPIDI_PARENT_PORT_KVSKEY "PARENT_ROOT_PORT_NAME"
 
21
 
 
22
/* FIXME: We can avoid these two routines if we define PMI as using
 
23
   MPI info values */
 
24
/* Turn a SINGLE MPI_Info into an array of PMI_keyvals (return the pointer
 
25
   to the array of PMI keyvals) */
 
26
static int  MPIDI_mpi_to_pmi_keyvals( MPID_Info *info_ptr, PMI_keyval_t **kv_ptr, int *nkeys_ptr )
 
27
{
 
28
    char key[MPI_MAX_INFO_KEY];
 
29
    PMI_keyval_t *kv = 0;
 
30
    int          i, nkeys = 0, vallen, flag, mpi_errno=MPI_SUCCESS;
 
31
 
 
32
    if (!info_ptr || info_ptr->handle == MPI_INFO_NULL) {
 
33
        goto fn_exit;
 
34
    }
 
35
 
 
36
    MPIR_Info_get_nkeys_impl( info_ptr, &nkeys );
 
37
    if (nkeys == 0) {
 
38
        goto fn_exit;
 
39
    }
 
40
    kv = (PMI_keyval_t *)MPIU_Malloc( nkeys * sizeof(PMI_keyval_t) );
 
41
 
 
42
    for (i=0; i<nkeys; i++) {
 
43
        mpi_errno = MPIR_Info_get_nthkey_impl( info_ptr, i, key );
 
44
        if (mpi_errno)
 
45
          TRACE_ERR("MPIR_Info_get_nthkey_impl returned with mpi_errno=%d\n", mpi_errno);
 
46
        MPIR_Info_get_valuelen_impl( info_ptr, key, &vallen, &flag );
 
47
 
 
48
        kv[i].key = MPIU_Strdup(key);
 
49
        kv[i].val = MPIU_Malloc( vallen + 1 );
 
50
        MPIR_Info_get_impl( info_ptr, key, vallen+1, kv[i].val, &flag );
 
51
        TRACE_OUT("key: <%s>, value: <%s>\n", kv[i].key, kv[i].val);
 
52
    }
 
53
 
 
54
 fn_fail:
 
55
 fn_exit:
 
56
    *kv_ptr    = kv;
 
57
    *nkeys_ptr = nkeys;
 
58
    return mpi_errno;
 
59
}
 
60
 
 
61
 
 
62
/* Free the entire array of PMI keyvals */
 
63
static void MPIDI_free_pmi_keyvals(PMI_keyval_t **kv, int size, int *counts)
 
64
{
 
65
    int i,j;
 
66
 
 
67
    for (i=0; i<size; i++)
 
68
    {
 
69
        for (j=0; j<counts[i]; j++)
 
70
        {
 
71
            if (kv[i][j].key != NULL)
 
72
                MPIU_Free((char *)kv[i][j].key);
 
73
            if (kv[i][j].val != NULL)
 
74
                MPIU_Free(kv[i][j].val);
 
75
        }
 
76
        if (kv[i] != NULL)
 
77
        {
 
78
            MPIU_Free(kv[i]);
 
79
        }
 
80
    }
 
81
}
 
82
 
 
83
/*@
 
84
   MPID_Comm_spawn_multiple -
 
85
 
 
86
   Input Arguments:
 
87
+  int count - count
 
88
.  char *array_of_commands[] - commands
 
89
.  char* *array_of_argv[] - arguments
 
90
.  int array_of_maxprocs[] - maxprocs
 
91
.  MPI_Info array_of_info[] - infos
 
92
.  int root - root
 
93
-  MPI_Comm comm - communicator
 
94
 
 
95
   Output Arguments:
 
96
+  MPI_Comm *intercomm - intercommunicator
 
97
-  int array_of_errcodes[] - error codes
 
98
 
 
99
   Notes:
 
100
 
 
101
.N Errors
 
102
.N MPI_SUCCESS
 
103
@*/
 
104
#undef FUNCNAME
 
105
#define FUNCNAME MPID_Comm_spawn_multiple
 
106
#undef FCNAME
 
107
#define FCNAME MPIU_QUOTE(FUNCNAME)
 
108
int MPID_Comm_spawn_multiple(int count, char *array_of_commands[],
 
109
                             char ** array_of_argv[], const int array_of_maxprocs[],
 
110
                             MPID_Info * array_of_info_ptrs[], int root,
 
111
                             MPID_Comm * comm_ptr, MPID_Comm ** intercomm,
 
112
                             int array_of_errcodes[])
 
113
{
 
114
    int mpi_errno = MPI_SUCCESS;
 
115
 
 
116
    if(mpidi_dynamic_tasking == 0) {
 
117
        fprintf(stderr, "Received spawn request for non-dynamic jobs\n");
 
118
        MPIU_ERR_SETANDSTMT(mpi_errno, MPI_ERR_SPAWN,
 
119
                            return mpi_errno, "**spawn");
 
120
    }
 
121
 
 
122
    /* We allow an empty implementation of this function to
 
123
       simplify building MPICH on systems that have difficulty
 
124
       supporing process creation */
 
125
    mpi_errno = MPIDI_Comm_spawn_multiple(count, array_of_commands,
 
126
                                          array_of_argv, array_of_maxprocs,
 
127
                                          array_of_info_ptrs,
 
128
                                          root, comm_ptr, intercomm,
 
129
                                          array_of_errcodes);
 
130
    return mpi_errno;
 
131
}
 
132
 
 
133
 
 
134
/*
 
135
 * MPIDI_Comm_spawn_multiple()
 
136
 */
 
137
int MPIDI_Comm_spawn_multiple(int count, char **commands,
 
138
                              char ***argvs, int *maxprocs,
 
139
                              MPID_Info **info_ptrs, int root,
 
140
                              MPID_Comm *comm_ptr, MPID_Comm
 
141
                              **intercomm, int *errcodes)
 
142
{
 
143
    char port_name[MPI_MAX_PORT_NAME];
 
144
    char jobId[64];
 
145
    char ctxid_str[16];
 
146
    int jobIdSize = 64;
 
147
    int len=0;
 
148
    int *info_keyval_sizes=0, i, mpi_errno=MPI_SUCCESS;
 
149
    PMI_keyval_t **info_keyval_vectors=0, preput_keyval_vector;
 
150
    int *pmi_errcodes = 0, pmi_errno=0;
 
151
    int total_num_processes, should_accept = 1;
 
152
    MPID_Info tmp_info_ptr;
 
153
    char *tmp;
 
154
    int tmp_ret = 0;
 
155
 
 
156
    if (comm_ptr->rank == root) {
 
157
        /* create an array for the pmi error codes */
 
158
        total_num_processes = 0;
 
159
        for (i=0; i<count; i++) {
 
160
            total_num_processes += maxprocs[i];
 
161
        }
 
162
        pmi_errcodes = (int*)MPIU_Malloc(sizeof(int) * total_num_processes);
 
163
 
 
164
        /* initialize them to 0 */
 
165
        for (i=0; i<total_num_processes; i++)
 
166
            pmi_errcodes[i] = 0;
 
167
 
 
168
        /* Open a port for the spawned processes to connect to */
 
169
        /* FIXME: info may be needed for port name */
 
170
        mpi_errno = MPID_Open_port(NULL, port_name);
 
171
        TRACE_ERR("mpi_errno from MPID_Open_port=%d\n", mpi_errno);
 
172
 
 
173
        /* Spawn the processes */
 
174
#ifdef USE_PMI2_API
 
175
        MPIU_Assert(count > 0);
 
176
        {
 
177
            int *argcs = MPIU_Malloc(count*sizeof(int));
 
178
            struct MPID_Info preput;
 
179
            struct MPID_Info *preput_p[2] = { &preput, &tmp_info_ptr };
 
180
 
 
181
            MPIU_Assert(argcs);
 
182
 
 
183
            info_keyval_sizes = MPIU_Malloc(count * sizeof(int));
 
184
 
 
185
            /* FIXME cheating on constness */
 
186
            preput.key = (char *)MPIDI_PARENT_PORT_KVSKEY;
 
187
            preput.value = port_name;
 
188
            preput.next = &tmp_info_ptr;
 
189
 
 
190
            tmp_info_ptr.key = "COMMCTX";
 
191
            len=sprintf(ctxid_str, "%d", comm_ptr->context_id);
 
192
            TRACE_ERR("COMMCTX=%d\n", comm_ptr->context_id);
 
193
            ctxid_str[len]='\0';
 
194
            tmp_info_ptr.value = ctxid_str;
 
195
            tmp_info_ptr.next = NULL;
 
196
 
 
197
            /* compute argcs array */
 
198
            for (i = 0; i < count; ++i) {
 
199
                argcs[i] = 0;
 
200
                if (argvs != NULL && argvs[i] != NULL) {
 
201
                    while (argvs[i][argcs[i]]) {
 
202
                        ++argcs[i];
 
203
                    }
 
204
                }
 
205
            }
 
206
 
 
207
            /*MPIU_THREAD_CS_ENTER(PMI,);*/
 
208
            /* release the global CS for spawn PMI calls */
 
209
            MPIU_THREAD_CS_EXIT(ALLFUNC,);
 
210
            pmi_errno = PMI2_Job_Spawn(count, (const char **)commands,
 
211
                                       argcs, (const char ***)argvs,
 
212
                                       maxprocs,
 
213
                                       info_keyval_sizes, (const MPID_Info **)info_ptrs,
 
214
                                       2, (const struct MPID_Info **)preput_p,
 
215
                                       jobId, jobIdSize,
 
216
                                       pmi_errcodes);
 
217
            TRACE_ERR("after PMI2_Job_Spawn - pmi_errno=%d jobId=%s\n", pmi_errno, jobId);
 
218
            MPIU_THREAD_CS_ENTER(ALLFUNC,);
 
219
 
 
220
            tmp=MPIU_Strdup(jobId);
 
221
            tmp_ret = atoi(strtok(tmp, ";"));
 
222
 
 
223
            if( (pmi_errno == PMI2_SUCCESS) && (tmp_ret != -1) ) {
 
224
              pami_task_t leader_taskid = atoi(strtok(NULL, ";"));
 
225
              pami_endpoint_t ldest;
 
226
 
 
227
              PAMI_Endpoint_create(MPIDI_Client,  leader_taskid, 0, &ldest);
 
228
              TRACE_ERR("PAMI_Resume to taskid=%d\n", leader_taskid);
 
229
              PAMI_Resume(MPIDI_Context[0], &ldest, 1);
 
230
            }
 
231
 
 
232
            MPIU_Free(tmp);
 
233
 
 
234
            MPIU_Free(argcs);
 
235
            if (pmi_errno != PMI2_SUCCESS) {
 
236
               TRACE_ERR("PMI2_Job_Spawn returned with pmi_errno=%d\n", pmi_errno);
 
237
            }
 
238
        }
 
239
#else
 
240
        /* FIXME: This is *really* awkward.  We should either
 
241
           Fix on MPI-style info data structures for PMI (avoid unnecessary
 
242
           duplication) or add an MPIU_Info_getall(...) that creates
 
243
           the necessary arrays of key/value pairs */
 
244
 
 
245
        /* convert the infos into PMI keyvals */
 
246
        info_keyval_sizes   = (int *) MPIU_Malloc(count * sizeof(int));
 
247
        info_keyval_vectors =
 
248
            (PMI_keyval_t**) MPIU_Malloc(count * sizeof(PMI_keyval_t*));
 
249
 
 
250
        if (!info_ptrs) {
 
251
            for (i=0; i<count; i++) {
 
252
                info_keyval_vectors[i] = 0;
 
253
                info_keyval_sizes[i]   = 0;
 
254
            }
 
255
        }
 
256
        else {
 
257
            for (i=0; i<count; i++) {
 
258
                mpi_errno = MPIDI_mpi_to_pmi_keyvals( info_ptrs[i],
 
259
                                                &info_keyval_vectors[i],
 
260
                                                &info_keyval_sizes[i] );
 
261
                if (mpi_errno) { TRACE_ERR("MPIDI_mpi_to_pmi_keyvals returned with mpi_errno=%d\n", mpi_errno); }
 
262
            }
 
263
        }
 
264
 
 
265
        preput_keyval_vector.key = MPIDI_PARENT_PORT_KVSKEY;
 
266
        preput_keyval_vector.val = port_name;
 
267
 
 
268
        pmi_errno = PMI_Spawn_multiple(count, (const char **)
 
269
                                       commands,
 
270
                                       (const char ***) argvs,
 
271
                                       maxprocs, info_keyval_sizes,
 
272
                                       (const PMI_keyval_t **)
 
273
                                       info_keyval_vectors, 1,
 
274
                                       &preput_keyval_vector,
 
275
                                       pmi_errcodes);
 
276
        TRACE_ERR("pmi_errno from PMI_Spawn_multiple=%d\n", pmi_errno);
 
277
#endif
 
278
 
 
279
        if (errcodes != MPI_ERRCODES_IGNORE) {
 
280
            for (i=0; i<total_num_processes; i++) {
 
281
                /* FIXME: translate the pmi error codes here */
 
282
                errcodes[i] = pmi_errcodes[0];
 
283
                /* We want to accept if any of the spawns succeeded.
 
284
                   Alternatively, this is the same as we want to NOT accept if
 
285
                   all of them failed.  should_accept = NAND(e_0, ..., e_n)
 
286
                   Remember, success equals false (0). */
 
287
                should_accept = should_accept && errcodes[i];
 
288
            }
 
289
            should_accept = !should_accept; /* the `N' in NAND */
 
290
        }
 
291
 
 
292
#ifdef USE_PMI2_API
 
293
        if( (pmi_errno == PMI2_SUCCESS) && (tmp_ret == -1) )
 
294
#else
 
295
        if( (pmi_errno == PMI_SUCCESS) && (tmp_ret == -1) )
 
296
#endif
 
297
          should_accept = 0;
 
298
    }
 
299
 
 
300
    if (errcodes != MPI_ERRCODES_IGNORE) {
 
301
        int errflag = FALSE;
 
302
        mpi_errno = MPIR_Bcast_impl(&should_accept, 1, MPI_INT, root, comm_ptr, &errflag);
 
303
        if (mpi_errno) TRACE_ERR("MPIR_Bcast_impl returned with mpi_errno=%d\n", mpi_errno);
 
304
 
 
305
        mpi_errno = MPIR_Bcast_impl(&pmi_errno, 1, MPI_INT, root, comm_ptr, &errflag);
 
306
        if (mpi_errno) TRACE_ERR("MPIR_Bcast_impl returned with mpi_errno=%d\n", mpi_errno);
 
307
 
 
308
        mpi_errno = MPIR_Bcast_impl(&total_num_processes, 1, MPI_INT, root, comm_ptr, &errflag);
 
309
        if (mpi_errno) TRACE_ERR("MPIR_Bcast_impl returned with mpi_errno=%d\n", mpi_errno);
 
310
 
 
311
        mpi_errno = MPIR_Bcast_impl(errcodes, total_num_processes, MPI_INT, root, comm_ptr, &errflag);
 
312
        if (mpi_errno) TRACE_ERR("MPIR_Bcast_impl returned with mpi_errno=%d\n", mpi_errno);
 
313
    }
 
314
 
 
315
    if (should_accept) {
 
316
        mpi_errno = MPID_Comm_accept(port_name, NULL, root, comm_ptr, intercomm);
 
317
        TRACE_ERR("mpi_errno from MPID_Comm_accept=%d\n", mpi_errno);
 
318
    } else {
 
319
        if( (pmi_errno == PMI2_SUCCESS) && (errcodes[0] != 0) ) {
 
320
          MPIR_Comm_create(intercomm);
 
321
        }
 
322
    }
 
323
 
 
324
    if (comm_ptr->rank == root) {
 
325
        /* Close the port opened for the spawned processes to connect to */
 
326
        mpi_errno = MPID_Close_port(port_name);
 
327
        /* --BEGIN ERROR HANDLING-- */
 
328
        if (mpi_errno != MPI_SUCCESS)
 
329
            TRACE_ERR("MPID_Close_port returned with mpi_errno=%d\n", mpi_errno);
 
330
        /* --END ERROR HANDLING-- */
 
331
    }
 
332
 
 
333
    if(pmi_errno) {
 
334
           mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, __FILE__, __LINE__, MPI_ERR_SPAWN,
 
335
            "**mpi_comm_spawn", 0);
 
336
    }
 
337
 
 
338
 fn_exit:
 
339
    if (info_keyval_vectors) {
 
340
        MPIDI_free_pmi_keyvals(info_keyval_vectors, count, info_keyval_sizes);
 
341
        MPIU_Free(info_keyval_vectors);
 
342
    }
 
343
    if (info_keyval_sizes) {
 
344
        MPIU_Free(info_keyval_sizes);
 
345
    }
 
346
    if (pmi_errcodes) {
 
347
        MPIU_Free(pmi_errcodes);
 
348
    }
 
349
    return mpi_errno;
 
350
 fn_fail:
 
351
    goto fn_exit;
 
352
}
 
353
 
 
354
 
 
355
/* This function is used only with mpid_init to set up the parent communicator
 
356
   if there is one.  The routine should be in this file because the parent
 
357
   port name is setup with the "preput" arguments to PMI_Spawn_multiple */
 
358
static char *parent_port_name = 0;    /* Name of parent port if this
 
359
                                         process was spawned (and is root
 
360
                                         of comm world) or null */
 
361
#undef FUNCNAME
 
362
#define FUNCNAME MPIDI_GetParentPort
 
363
#undef FCNAME
 
364
#define FCNAME MPIU_QUOTE(FUNCNAME)
 
365
int MPIDI_GetParentPort(char ** parent_port)
 
366
{
 
367
    int mpi_errno = MPI_SUCCESS;
 
368
    int pmi_errno;
 
369
    char val[MPIDI_MAX_KVS_VALUE_LEN];
 
370
 
 
371
    if (parent_port_name == NULL)
 
372
    {
 
373
        char *kvsname = NULL;
 
374
        /* We can always use PMI_KVS_Get on our own process group */
 
375
        MPIDI_PG_GetConnKVSname( &kvsname );
 
376
#ifdef USE_PMI2_API
 
377
        {
 
378
            int vallen = 0;
 
379
            pmi_errno = PMI2_KVS_Get(kvsname, PMI2_ID_NULL, MPIDI_PARENT_PORT_KVSKEY, val, sizeof(val), &vallen);
 
380
            TRACE_ERR("PMI2_KVS_Get - val=%s\n", val);
 
381
            if (pmi_errno)
 
382
                TRACE_ERR("PMI2_KVS_Get returned with pmi_errno=%d\n", pmi_errno);
 
383
        }
 
384
#else
 
385
        /*MPIU_THREAD_CS_ENTER(PMI,);*/
 
386
        pmi_errno = PMI_KVS_Get( kvsname, MPIDI_PARENT_PORT_KVSKEY, val, sizeof(val));
 
387
/*      MPIU_THREAD_CS_EXIT(PMI,);*/
 
388
        if (pmi_errno) {
 
389
            mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**pmi_kvsget", "**pmi_kvsget %d", pmi_errno);
 
390
            goto fn_exit;
 
391
        }
 
392
#endif
 
393
        parent_port_name = MPIU_Strdup(val);
 
394
    }
 
395
 
 
396
    *parent_port = parent_port_name;
 
397
 
 
398
 fn_exit:
 
399
    return mpi_errno;
 
400
 fn_fail:
 
401
    goto fn_exit;
 
402
}
 
403
 
 
404
 
 
405
void MPIDI_FreeParentPort(void)
 
406
{
 
407
    if (parent_port_name) {
 
408
        MPIU_Free( parent_port_name );
 
409
        parent_port_name = 0;
 
410
    }
 
411
}
 
412
 
 
413
 
 
414
#endif