3
"$Id: pvmdmimd.c,v 1.10 1997/10/01 15:36:06 pvmsrc Exp $";
6
* PVM version 3.4: Parallel Virtual Machine System
7
* University of Tennessee, Knoxville TN.
8
* Oak Ridge National Laboratory, Oak Ridge TN.
9
* Emory University, Atlanta GA.
10
* Authors: J. J. Dongarra, G. E. Fagg, M. Fischer
11
* G. A. Geist, J. A. Kohl, R. J. Manchek, P. Mucci,
12
* P. M. Papadopoulos, S. L. Scott, and V. S. Sunderam
13
* (C) 1997 All Rights Reserved
17
* Permission to use, copy, modify, and distribute this software and
18
* its documentation for any purpose and without fee is hereby granted
19
* provided that the above copyright notice appear in all copies and
20
* that both the copyright notice and this permission notice appear in
21
* supporting documentation.
23
* Neither the Institutions (Emory University, Oak Ridge National
24
* Laboratory, and University of Tennessee) nor the Authors make any
25
* representations about the suitability of this software for any
26
* purpose. This software is provided ``as is'' without express or
29
* PVM version 3 was funded in part by the U.S. Department of Energy,
30
* the National Science Foundation and the State of Tennessee.
38
* void mpp_init(int argc, char **argv):
39
* Initialization. Create a table to keep track of active nodes.
40
* argc, argv: passed from main.
42
* int mpp_load(int flags, char *name, char *argv, int count, int tids[],
44
* Load executable onto nodes; create new entries in task table,
45
* encode node number and process type into task IDs, etc.
47
* Construction of Task ID:
49
* 1 0 9 8 7 6 5 4 3 2 1 0 9 8 7 6 5 4 3 2 1 0 9 8 7 6 5 4 3 2 1
50
* +-+-+---------------------+-+-----------+---------------------+
51
* |s|g| host index |n| instance | node # (2048) |
52
* +-+-+---------------------+-+-----------+---------------------+
54
* The "n" bit is set for node task but clear for host task.
56
* flags: exec options;
57
* name: executable to be loaded;
58
* argv: command line argument for executable
59
* count: number of tasks to be created;
60
* tids: array to store new task IDs;
61
* ptid: parent task ID.
63
* mpp_new(int count, int ptid):
64
* Allocate a set of nodes. (called by mpp_load())
65
* count: number of nodes; ptid: parent task ID.
68
* Send all pending packets to nodes via native send. Node number
69
* and process type are extracted from task ID.
71
* int mpp_mcast(int src, struct pkt pp, int tids[], int ntask):
73
* src: source task ID;
75
* tids: list of destination task IDs;
79
* Probe for pending packets from nodes (non-blocking). Returns
80
* 1 if packets are dectected, otherwise 0.
83
* Receive pending packets (from nodes) via native recv.
85
* struct task *mpp_find(int pid):
86
* Find a task in task table by its Unix pid.
88
* void mpp_free(struct task *tp):
89
* Remove node/process-type from active list.
93
* Revision 1.10 1997/10/01 15:36:06 pvmsrc
94
* Removed unnecessary #include "fromlib.h" header.
95
* - all consts now included in pvm3.h...
96
* - header file eliminated.
99
* Revision 1.9 1997/08/27 20:18:58 pvmsrc
100
* Added blank args (0,0,0) in mpp_load() to make it call forkexec correctly.
101
* Protocol needs to be changed to allow taskers to access IBM poe directly.
104
* Revision 1.8 1997/07/09 13:54:58 pvmsrc
105
* Fixed Author Header.
107
* Revision 1.7 1997/06/02 13:48:38 pvmsrc
108
* Moved #include host.h above #include waitc.h.
109
* Removed old mesg.h include... gone baby.
111
* Revision 1.6 1997/05/06 20:14:36 pvmsrc
112
* Catch stdout/stderr correctly and redirect to outtid,tag,ctx
114
* Revision 1.5 1997/05/05 20:08:56 pvmsrc
115
* Pass outctx and trcctx to mpp tasks.
117
* Revision 1.4 1997/05/02 13:52:50 pvmsrc
118
* Start up MPI jobs correctly and get them configured.
120
* Revision 1.3 1997/03/25 15:52:21 pvmsrc
121
* PVM patches from the base 3.3.10 to 3.3.11 versions where applicable.
122
* Originals by Bob Manchek. Altered by Graham Fagg where required.
123
* -IP enabled over switch
124
* -RMPOOL env can be used instead of host list
126
* Revision 1.2 1997/01/28 19:30:57 pvmsrc
127
* New Copyright Notice & Authors.
129
* Revision 1.1 1996/09/23 23:15:09 pvmsrc
132
* Revision 1.3 1996/05/14 14:35:59 manchek
133
* inc'd changes from chulho@kgn.ibm.com
135
* Revision 1.2 1995/07/25 17:41:27 manchek
136
* mpp_output returns int
138
* Revision 1.1 1995/05/30 17:23:56 manchek
141
* Revision 1.3 1994/06/03 20:54:24 manchek
144
* Revision 1.2 1993/12/20 15:39:47 manchek
147
* Revision 1.1 1993/08/30 23:35:09 manchek
153
#include <sys/param.h>
154
#include <sys/types.h>
155
#include <sys/time.h>
156
#include <sys/socket.h>
157
#include <netinet/in.h>
158
#include <netinet/tcp.h>
159
#include <sys/stat.h>
167
#define CINDEX(s,c) strchr(s,c)
170
#define CINDEX(s,c) index(s,c)
174
#include <pvmproto.h>
178
#include "pvmalloc.h"
186
#define MPICOMM "/usr/bin/poe"
187
#define MPIOPT1 "-procs"
188
#define MPIOPT2 "-euilib"
189
#define MPIOPT3 "-hfile"
190
#define MPIOPT4 "-rmpool"
191
#define MPIOPARG2 "us" /* options: -procs # -euilib us */
192
#define MPIOPARG3 "ip" /* Enable IP over switch */
193
#define MPIARGC 7 /* number of command line arguments */
199
extern int pvmdebmask; /* from pvmd.c */
200
extern char **epaths; /* from pvmd.c */
201
extern int myhostpart; /* from pvmd.c */
202
extern int tidhmask; /* from pvmd.c */
203
extern int ourudpmtu; /* from pvmd.c */
204
extern struct htab *hosts; /* from pvmd.c */
205
extern struct task *locltasks; /* from task.c */
207
int tidtmask = TIDPTYPE; /* mask for ptype field of tids */
208
int tidnmask = TIDNODE; /* mask for node field of tids */
212
static int myndf = 0;
213
static struct nodeset *busynodes; /* active nodes; ordered by proc type */
214
static char pvmtxt[512]; /* scratch for error log */
215
static int ptypemask; /* mask; we use these bits of ptype in tids */
216
static char nodefile[L_tmpnam]; /* tmp node file */
217
static char **nodelist = 0; /* default poe node list */
218
static int partsize = 0; /* number of nodes allocated */
219
static int hostfileused = TRUE; /* Check if MP_HOSTFILE used */
220
static char defaultpool[64]="1"; /* default MP_POOL if not set */
221
static char mpiadapter[]={MPIOPARG2}; /* default User-Space */
223
static int sp2pvminfo[SIZEHINFO];
233
char *hfn; /* host file name */
234
char nname[128]; /* node name */
238
if ((hfn = getenv("LOADLBATCH"))) {
239
if (strcmp(hfn, "yes") == 0) {
240
if ((hfn = getenv("LOADL_PROCESSOR_LIST"))) {
242
"LOADL_PROCESSOR_LIST=%s.\n",hfn);
243
pvmlogperror(pvmtxt);
245
for (hfn;*hfn!='\0';hfn++) {
251
"LOADL_PROCESSOR_LIST=%s - is not set\n",hfn);
252
pvmlogperror(pvmtxt);
256
sprintf(pvmtxt,"LOADLBATCH=%s - not set to yes\n",hfn);
257
pvmlogperror(pvmtxt);
260
} else if ((hfn = getenv("MP_PROCS"))) {
261
if ((partsize = atoi(hfn)) < 1) {
262
sprintf(pvmtxt,"MP_PROCS=%d must be >= to 1\n",partsize);
263
pvmlogperror(pvmtxt);
267
if ((hfn = getenv("MP_RMPOOL"))) {
270
"MP_RMPOOL=%d must be >= to 0\n",defaultpool);
271
pvmlogperror(pvmtxt);
274
strcpy(defaultpool,hfn);
276
} else if ((hfn = getenv("MP_HOSTFILE")))
278
if (!(hfp = fopen(hfn, "r"))) {
279
sprintf(pvmtxt, "sp2hostfile() fopen %s\n", hfn);
280
pvmlogperror(pvmtxt);
284
while (fscanf(hfp, "%s", nname) != EOF)
287
nodelist = TALLOC(partsize, char*, "nname");
289
for (i = 0; i < partsize; i++) {
290
fscanf(hfp, "%s", nname);
291
nodelist[i] = STRALLOC(nname);
296
pvmlogerror("mpp_init() no POE host file.\n");
297
pvmlogerror("mpp_init() MP_PROCS, MP_RMPOOL or MP_HOSTFILE must be set.\n");
300
if ((hfn = getenv("MP_EUILIB"))) {
301
if (strcmp(hfn, "ip") == 0) {
302
strcpy(mpiadapter,MPIOPARG3); /* IP over switch */
305
sprintf(pvmtxt, "%d nodes allocated.\n", partsize);
308
busynodes = TALLOC(1, struct nodeset, "nsets");
309
BZERO((char*)busynodes, sizeof(struct nodeset));
310
busynodes->n_link = busynodes;
311
busynodes->n_rlink = busynodes;
313
ptypemask = tidtmask >> (ffs(tidtmask) - 1);
317
/* create tmp poe host file from default */
319
sp2hostfile(first, count)
320
int first; /* first node in the set */
321
int count; /* number of nodes requested */
326
if (partsize < count) {
327
sprintf(pvmtxt, "sp2hostfile() need at least %d nodes\n", count+1);
328
pvmlogperror(pvmtxt);
331
(void)tmpnam(nodefile);
332
if (!(tmpfp = fopen(nodefile, "w"))) {
333
sprintf(pvmtxt, "sp2hostfile() fopen %s", nodefile);
334
pvmlogperror(pvmtxt);
337
if (pvmdebmask & PDMNODE) {
338
sprintf(pvmtxt, "sp2hostfile() POE host file: %s\n", nodefile);
341
for (i = first; i < count + first; i++)
342
fprintf(tmpfp, "%s\n", nodelist[i]);
349
* find a set of free nodes from nodelist; assign ptype sequentially,
350
* only tasks spawned together get the same ptype
354
int count; /* number of nodes requested */
355
int ptid; /* parent's tid */
357
struct nodeset *sp, *newp, *sp2;
361
if (!(newp = TALLOC(1, struct nodeset, "nsets"))) {
362
pvmlogerror("mpp_new() can't get memory\n");
365
BZERO((char*)newp, sizeof(struct nodeset));
367
newp->n_size = count;
368
for (sp = busynodes->n_link; sp != busynodes; sp = sp->n_link) {
369
if (sp->n_first - last > count)
371
last = sp->n_first + sp->n_size - 1;
373
if (sp->n_link == busynodes && partsize - last > count)
376
if (ptype <= sp->n_ptype)
377
ptype = sp->n_ptype + 1;
379
if (sp == busynodes && partsize - last <= count) {
380
pvmlogerror("mpp_new() not enough nodes in partition\n");
382
return (struct nodeset *)0;
384
for (sp2 = busynodes->n_link; sp2 != busynodes; sp2 = sp2->n_link)
385
if ((sp2->n_ptype & ptypemask) == (ptype & ptypemask))
387
if (sp2 != busynodes || ptype == NPARTITIONS) {
388
for (ptype = 0; ptype < NPARTITIONS; ptype++) {
389
for (sp2 = busynodes->n_link; sp2 != busynodes; sp2 = sp2->n_link)
390
if ((sp2->n_ptype & ptypemask) == (ptype & ptypemask))
392
if (sp2 == busynodes)
395
if (ptype == NPARTITIONS) {
396
pvmlogerror("mpp_new() out of ptypes: too many spawns\n");
397
return (struct nodeset *)0;
402
if (pvmdebmask & PDMNODE) {
403
sprintf(pvmtxt, "mpp_new() %d nodes %d ... ptype=%d ptid=%x\n",
404
count, last+1, ptype, ptid);
407
newp->n_first = last + 1;
409
if (!sp2hostfile(newp->n_first, count)) {
411
return (struct nodeset *)0;
414
newp->n_ptype = ptype;
416
newp->n_alive = count - 1;
417
LISTPUTBEFORE(sp, newp, n_link, n_rlink);
424
* remove node/ptype from active list; if tid is the last to go, shutdown
425
* pvmhost's socket, but do not destroy the node set because pvmhost may
426
* not exit immediately. To avoid a race condition, let mpp_output()
441
ptype = TIDTOTYPE(tid);
442
tp->t_txq = 0; /* don't free pvmhost's txq */
443
for (sp = busynodes->n_link; sp != busynodes; sp = sp->n_link) {
444
if ((sp->n_ptype & ptypemask) == ptype) {
446
if (pvmdebmask & PDMNODE) {
447
sprintf(pvmtxt, "mpp_free() t%x type=%ld alive=%d\n",
448
tid, sp->n_ptype, sp->n_alive);
451
if (--sp->n_alive == 0) {
452
if (tp2 = task_find(sp->n_ptid)) {
453
tp2->t_flag |= TF_CLOSE;
454
if (tp2->t_sock != -1) {
456
wrk_fds_delete(tp2->t_sock, 3);
457
(void)close(tp2->t_sock);
460
shutdown(tp2->t_sock, 1);
461
/* close stdout after pvmhost dies */
462
tp2->t_out = tp->t_out;
466
LISTDELETE(sp, n_link, n_rlink);
470
tp->t_out = -1; /* don't free shared stdout if alive > 0 */
474
sprintf(pvmtxt, "mpp_free() t%x not active\n", tid);
480
/* load executable onto the given set of nodes */
483
struct waitc_spawn *wxp;
485
int flags = 0; /* exec options */
486
char *name; /* executable */
487
char **argv; /* arg list (argv[-1] must be there) */
488
int count; /* how many */
489
int *tids; /* array to store new tids */
490
int ptid; /* parent task ID */
491
int nenv; /* length of environment */
492
char **envp; /* environment strings */
493
int ptypepart; /* type field */
496
struct pkt *hosttxq; /* out-going queue of pvmhost */
499
char c[128]; /* buffer to store count, name.host */
503
char path[MAXPATHLEN];
506
int hostout; /* stdout of pvmhost */
507
struct hostd *hp = hosts->ht_hosts[hosts->ht_local];
508
int hostpid; /* Unix pid of pvmhost */
509
char htid[128]; /* buffer to store pvmhost tid */
512
static char *nullep[] = { "", 0 };
514
/* -- initialize some variables from the waitc_spawn struct -- */
516
name = wxp->w_argv[0];
518
count = wxp->w_veclen;
526
eplist = CINDEX(name, '/') ? nullep : epaths;
528
for (ep = eplist; *ep; ep++) {
529
/* search for file */
530
(void)strcpy(path, *ep);
532
(void)strcat(path, "/");
533
(void)strncat(path, name, sizeof(path) - strlen(path) - 1);
535
if (stat(path, &sb) == -1
536
|| ((sb.st_mode & S_IFMT) != S_IFREG)
537
|| !(sb.st_mode & S_IEXEC)) {
538
if (pvmdebmask & PDMTASK) {
539
sprintf(pvmtxt, "mpp_load() stat failed <%s>\n", path);
545
if (!(sp = mpp_new(count+1, ptid))) {
549
ptypepart = (sp->n_ptype << (ffs(tidtmask) - 1)) | TIDONNODE;
552
for (nargs = 0; argv[nargs]; nargs++);
555
/* ar[-1], poe, -procs, #, -euilib, us, -hfile fname */
556
nargs += MPIARGC + 1;
557
av = TALLOC(nargs + 1, char*, "argv");
558
av++; /* reserve room for debugger */
559
BZERO((char*)av, nargs * sizeof(char*));
564
av[--nargs] = nodefile;
565
av[--nargs] = MPIOPT3;
567
av[--nargs] = defaultpool;
568
av[--nargs] = MPIOPT4;
570
av[--nargs] = mpiadapter;
571
av[--nargs] = MPIOPT2;
572
sprintf(c, "%d", count+1);
574
av[--nargs] = MPIOPT1;
575
for (j = 2; j < nargs; j++)
576
av[j] = argv[j - 1]; /* poe name argv -procs # -euilib us */
578
if ((sock = mksock()) == -1) {
583
if (flags & PvmTaskDebug)
584
av++; /* pdbx name -procs # -euilib us */
585
/* if (err = forkexec(flags, av[0], av, 0, (char **)0, &tp))
587
if (err = forkexec(flags, av[0], av, 0, (char **)0, 0,
591
PVM_FREE(tp->t_a_out);
592
sprintf(c, "%s.host", name);
593
tp->t_a_out = STRALLOC(c);
594
sp->n_ptid = tp->t_tid; /* pvmhost's tid */
599
sprintf(htid, "PVMHTID=%d", tp->t_tid);
602
sp2pvminfo[0] = TDPROTOCOL;
603
sp2pvminfo[1] = myhostpart + ptypepart;
604
sp2pvminfo[2] = ptid;
605
sp2pvminfo[3] = MAXFRAGSIZE;
606
sp2pvminfo[4] = myndf;
607
sp2pvminfo[5] = partsize;
608
sp2pvminfo[6] = wxp->w_outtid;
609
sp2pvminfo[7] = wxp->w_outtag;
610
sp2pvminfo[8] = wxp->w_outctx;
611
sp2pvminfo[9] = wxp->w_trctid;
612
sp2pvminfo[10] = wxp->w_trctag;
613
sp2pvminfo[11] = wxp->w_trcctx;
617
if (sockconn(sock, tp, pvminfo) == -1) {
623
/* XXX task may not be on same host; can't do auth with tmp file */
624
tp->t_flag |= TF_CONN;
625
if (pvmdebmask & PDMTASK) {
626
sprintf(pvmtxt, "mpp_load() %d type=%d ptid=%x t%x...\n",
627
count, sp->n_ptype, ptid, myhostpart + ptypepart);
631
/* create new task structs */
633
for (j = 0; j < count; j++) {
634
tp = task_new(myhostpart + ptypepart + j);
635
tp->t_a_out = STRALLOC(name);
637
tp->t_flag |= TF_CONN; /* no need for the auth crap */
640
tp->t_txq = hosttxq; /* node tasks share pvmhost's txq */
641
tp->t_out = hostout; /* and stdout */
642
tp->t_pid = hostpid; /* pvm_kill should kill pvmhost */
643
tp->t_outtid = wxp->w_outtid; /* catch stdout/stderr */
644
tp->t_outtag = wxp->w_outtag;
645
tp->t_outctx = wxp->w_outctx;
649
if (pvmdebmask & PDMTASK) {
650
sprintf(pvmtxt, "mpp_load() didn't find <%s>\n", name);
656
for (j = 0; j < count; j++)
664
/* kill poe process */
672
char nname[128]; /* node name */
673
char comm[512]; /* command to issue */
674
char *hfn; /* host file name */
676
char *av[8]; /* for rsh args */
678
int pid = -1; /* pid of rsh */
681
if ((hfn = getenv("MP_HOSTFILE")) || stat(hfn = "host.list", &sb) != -1) {
682
if (fp = fopen(hfn, "r")) {
683
for (i = 0; i < node; i++)
684
fscanf(fp, "%s", nname);
686
if ((pid = fork()) == -1) {
687
pvmlogperror("sp2kill() fork");
692
av[ac++] = "/usr/bin/rsh";
694
av[ac++] = "poekill";
697
for (i = getdtablesize(); --i > 2; )
703
pvmlogperror("sp2kill() fopen");
706
pvmlogerror("sp2kill() no host file");
718
if (TIDISNODE(tp->t_tid)) {
719
if (signum == SIGTERM || signum == SIGKILL) {
720
/* sp2kill(tp->t_a_out, tp->t_tid & tidnmask); */
721
(void)kill(tp->t_pid, signum);
726
sprintf(pvmtxt,"mpp_kill() signal %d to node t%x ignored\n",
731
(void)kill(tp->t_pid, signum);
736
* Add pvmhost's socket to wfds if there are packets waiting to
737
* be sent to a related node task. Node tasks have no sockets;
738
* they share pvmhost's packet queue (txq). Pvmhost simply
739
* forwards any packets it receives to the appropriate node.
743
mpp_output(dummy1, dummy2)
747
struct nodeset *sp, *sp2;
751
for (sp = busynodes->n_link; sp != busynodes; sp = sp->n_link)
752
if ((tp = task_find(sp->n_ptid))) {
753
if (tp->t_txq->pk_link->pk_buf && tp->t_sock != -1)
754
wrk_fds_add(tp->t_sock, 2);
757
sprintf(pvmtxt, "mpp_output() pvmhost %d died!\n", sp->n_ptype);
759
/* clean up tasks it serves */
760
ptype = sp->n_ptype & ptypemask;
761
for (tp = locltasks->t_link; tp != locltasks; tp = tp->t_link)
762
if (TIDISNODE(tp->t_tid) && TIDTOTYPE(tp->t_tid) == ptype) {
765
task_cleanup(tp->t_link);
766
task_free(tp->t_link);
769
/* pvmhost has died, destroy the node set */
772
LISTDELETE(sp2, n_link, n_rlink);
779
/* replace tm_connect and tm_conn2 */
786
int pvminfo[SIZEHINFO]; /* host info */
787
int ptypepart; /* type field */
789
if (pvmdebmask & PDMNODE) {
790
sprintf(pvmtxt, "mpp_conn() pvmhost %x", tp2->t_tid);
793
tp2->t_sock = tp->t_sock;
794
tp2->t_sad = tp->t_sad;
795
tp2->t_salen = tp->t_salen;
796
tp2->t_flag |= TF_CONN;
798
for (sp = busynodes->n_link; sp != busynodes; sp = sp->n_link)
799
if (sp->n_ptid == tp2->t_tid)
801
if (sp == busynodes) {
802
pvmlogerror("mpp_conn() task is not pvmhost\n");
805
ptypepart = (sp->n_ptype << (ffs(tidtmask) - 1)) | TIDONNODE;
806
if (write(tp2->t_sock, sp2pvminfo, sizeof(sp2pvminfo))
807
!= sizeof(sp2pvminfo)) {
808
pvmlogperror("mpp_conn() write");
818
* Create socket to talk to pvmhost.
819
* Return socket descriptor if successful, -1 otherwise.
824
struct hostd *hp = hosts->ht_hosts[hosts->ht_local];
825
struct sockaddr_in sin;
831
if ((sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
832
pvmlogperror("mksock() socket");
836
if (bind(sock, (struct sockaddr*)&sin, sizeof(sin)) == -1) {
837
pvmlogperror("mksock() bind");
842
if (getsockname(sock, (struct sockaddr*)&sin, &cc) == -1) {
843
pvmlogperror("mksock() getsockname");
847
if (listen(sock, 1) == -1) {
848
pvmlogperror("mksock() listen");
853
p = inadport_hex(&sin);
854
sprintf(buf, "PVMSOCK=%s", p);
863
* Wait for connect request from pvmhost and establish connection.
864
* Return 0 if successful, -1 otherwise.
865
* Close listening socket.
868
sockconn(sock, tp, hinfo)
869
int sock; /* listening post */
870
struct task *tp; /* pvm host */
871
int hinfo[]; /* host info to pass along */
875
if ((tp->t_sock = accept(sock, (struct sockaddr*)&tp->t_sad,
876
&tp->t_salen)) == -1) {
877
pvmlogperror("sockconn() accept");
880
if (pvmdebmask & (PDMPACKET|PDMTASK)) {
881
sprintf(pvmtxt, "sockconn() accept from %s sock %d\n",
882
inadport_decimal(&tp->t_sad), tp->t_sock);
888
if (setsockopt(tp->t_sock, IPPROTO_TCP, TCP_NODELAY,
889
(char*)&i, sizeof(int)) == -1)
890
pvmlogperror("sockconn() setsockopt");
893
if (write(tp->t_sock, hinfo, SIZEHINFO*sizeof(int))
894
!= SIZEHINFO*sizeof(int)) {
895
pvmlogperror("sockconn: write");
898
if ((i = fcntl(tp->t_sock, F_GETFL, 0)) == -1)
899
pvmlogperror("sockconn: fcntl");
902
(void)fcntl(tp->t_sock, F_SETFL, i);
904
wrk_fds_add(tp->t_sock, 1);