3
// process vbx (message) files in the service of either scheduler or vbsingle
4
// Copyright (c) 1998-2010 by The VoxBo Development Team
6
// This file is part of VoxBo
8
// VoxBo is free software: you can redistribute it and/or modify it
9
// under the terms of the GNU General Public License as published by
10
// the Free Software Foundation, either version 3 of the License, or
11
// (at your option) any later version.
13
// VoxBo is distributed in the hope that it will be useful, but
14
// WITHOUT ANY WARRANTY; without even the implied warranty of
15
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16
// General Public License for more details.
18
// You should have received a copy of the GNU General Public License
19
// along with VoxBo. If not, see <http://www.gnu.org/licenses/>.
21
// For general information on VoxBo, including the latest complete
22
// source code and binary distributions, manual, and associated files,
23
// see the VoxBo home page at: http://www.voxbo.org/
25
// original version written by Dan Kimberg
29
#include <sys/signal.h>
32
#include "vbjobspec.h"
34
extern vector<VBHost> hostlist;
37
int process_jobrunning(int snum,int jnum,pid_t pid,pid_t childpid);
38
int process_setseqinfo(int ss,string newstatusinfo);
39
int process_setjobinfo(int ss,int jj,string newstatusinfo);
40
int process_email(string recipient,string fname);
41
int process_adminemail(string fname);
42
int process_hostupdate(string hh);
43
int process_missing(int snum,int jnum,pid_t pid);
44
void setjobinfo_update_hostlist(VBJobSpec newjs);
51
for (size_t i=0; i<vg.size(); i++) {
52
line.ParseFirstLine(vg[i]);
53
if (line[0]=="setjobinfo")
54
process_setjobinfo(strtol(line[1]),strtol(line[2]),line.Tail(3));
55
else if (line[0]=="jobrunning")
56
process_jobrunning(strtol(line[1]),strtol(line[2]),strtol(line[3]),strtol(line[4]));
57
else if (line[0]=="setseqinfo")
58
process_setseqinfo(strtol(line[1]),line.Tail(2));
59
else if (line[0]=="email")
60
process_email(line[1],gb.gl_pathv[i]);
61
else if (line[0]=="adminemail")
62
process_adminemail(gb.gl_pathv[i]);
63
else if (line[0]=="missing")
64
process_missing(strtol(line[1]),strtol(line[2]),strtol(line[3]));
65
else if (line[0]=="hostupdate")
66
process_hostupdate(line.Tail());
68
printf("[E] %s invalid vbx file %s\n",timedate().c_str(),vg[i].c_str());
70
unlink(vg[i].c_str());
71
//rename(gb.gl_pathv[i],((string)"tmp/"+gb.gl_pathv[i]).c_str());
76
process_missing(int snum,int jnum,pid_t pid)
78
VBJobSpec *js=findjob(snum,jnum);
82
if (js->missingcount>2) {
83
printf("[E] %s job %08d-%05d has gone missing (count %d) from host %s\n",
84
timedate().c_str(),snum,jnum,js->missingcount,js->hostname.c_str());
85
process_setjobinfo(snum,jnum,"status W");
91
process_jobrunning(int snum,int jnum,pid_t pid,pid_t childpid)
93
// set it in the main list
94
process_setjobinfo(snum,jnum,(string)"pid "+strnum(pid));
95
// set it in the running list and return
96
for (HI h=hostlist.begin(); h!=hostlist.end(); h++) {
97
for (JI j=h->speclist.begin(); j!=h->speclist.end(); j++) {
98
if (snum==j->snum && jnum==j->jnum) {
100
j->childpid=childpid;
105
// shouldn't happen unless a host disappears
106
printf("[E] %s bad jobrunning message: %08d-%08d, %d %d\n",
107
timedate().c_str(),snum,jnum,pid,childpid);
112
process_setjobinfo(int ss,int jj,string newstatusinfo)
115
char jobfile[STRINGLEN];
117
sprintf(jobfile,"%08d/%05d.job",ss,jj);
118
if ((err=appendline(jobfile,newstatusinfo))) {
119
printf("[E] %s couldn't update job file %s (%d,%d,%s) [%d]\n",
120
timedate().c_str(),jobfile,
121
ss,jj,newstatusinfo.c_str(),err);
124
VBSequence seq(ss,jj);
126
// read the updated file
127
if (seq.speclist.size()!=1) {
128
printf("[E] %s couldn't read updated job file (%d,%d,%s)\n",timedate().c_str(),
129
ss,jj,newstatusinfo.c_str());
132
// update it in hostlist (if running) and main list (regardless)
133
if ((oldjob=findjob(ss,jj))) {
134
if (oldjob->status=='R')
135
setjobinfo_update_hostlist(seq.speclist[0]);
136
*oldjob=seq.speclist[0];
139
printf("[E] %s couldn't find job record (%d,%d,%s)\n",timedate().c_str(),
140
ss,jj,newstatusinfo.c_str());
145
setjobinfo_update_hostlist(VBJobSpec newjs)
147
// update or remove on hostlist
148
for (HI h=hostlist.begin(); h!=hostlist.end(); h++) {
149
for (JI j=h->speclist.begin(); j!=h->speclist.end(); j++) {
150
if (newjs.snum==j->snum && newjs.jnum==j->jnum) {
151
if (newjs.status!='R')
152
h->speclist.erase(j);
159
printf("[E] %s couldn't update %08d-%05d on any host, even %s\n",
160
timedate().c_str(),newjs.snum,newjs.jnum,newjs.hostname.c_str());
164
process_setseqinfo(int ss,string newstatusinfo)
167
char seqfile[STRINGLEN];
168
sprintf(seqfile,"%08d/%d.seq",ss,ss);
169
if (appendline(seqfile,newstatusinfo)) {
170
printf("[E] %s couldn't update sequence file\n",timedate().c_str());
173
VBSequence js,*oldseq;
174
js.LoadSequence(xdirname(seqfile));
175
// find a home for it
176
if ((oldseq=findsequence(ss)))
182
process_hostupdate(string hh)
184
string hostinfo,field;
188
args.SetQuoteChars("[<(\"'");
189
argx.SetQuoteChars("[<(\"'");
191
// first argument better be full hostname
192
argx.ParseLine(args[0]);
193
if (argx[0]!="hostname") {
194
printf("[E] %s malformed host update\n",timedate().c_str());
197
string hostname=argx[1];
198
for (int i=0; i<(int)hostlist.size(); i++) {
199
if (hostlist[i].hostname==hostname) {
204
printf("[E] %s invalid host update from %s\n",
205
timedate().c_str(),hostname.c_str());
209
tmph->resources.clear();
210
tmph->provides.clear();
211
for (int i=1; i<args.size(); i++) {
212
argx.ParseLine(args[i]);
214
if (field=="hostname")
215
tmph->hostname=argx[1];
216
else if (field=="shortname")
217
tmph->shortname=argx[1];
218
else if (field=="pri")
219
tmph->currentpri=strtol(argx[1]);
220
else if (field=="load")
221
tmph->loadaverage=strtod(argx[1]);
222
else if (field=="resource") {
226
rr.cnt=strtol(argx[3]);
227
tmph->resources.push_back(rr);
229
else if (field=="provides") {
233
rr.cnt=strtol(argx[3]);
234
tmph->provides.push_back(rr);
236
else if (field=="total_cpus")
237
tmph->total_cpus=strtol(argx[1]);
238
else if (field=="taken_cpus")
239
tmph->taken_cpus=strtol(argx[1]);
240
else if (field=="avail_cpus")
241
tmph->avail_cpus=strtol(argx[1]);
242
else if (field=="state")
243
tmph->status=argx[1];
246
tmph->lastresponse=time(NULL);
247
tmph->lastupdate=time(NULL);