~ubuntu-branches/ubuntu/raring/voxbo/raring

« back to all changes in this revision

Viewing changes to scheduler/processvbx.cpp

  • Committer: Bazaar Package Importer
  • Author(s): Michael Hanke
  • Date: 2010-06-06 11:33:11 UTC
  • Revision ID: james.westby@ubuntu.com-20100606113311-v3c13imdkkd5n7ae
Tags: upstream-1.8.5~svn1172
ImportĀ upstreamĀ versionĀ 1.8.5~svn1172

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
 
 
2
// processvbx.cpp
 
3
// process vbx (message) files in the service of either scheduler or vbsingle
 
4
// Copyright (c) 1998-2010 by The VoxBo Development Team
 
5
 
 
6
// This file is part of VoxBo
 
7
// 
 
8
// VoxBo is free software: you can redistribute it and/or modify it
 
9
// under the terms of the GNU General Public License as published by
 
10
// the Free Software Foundation, either version 3 of the License, or
 
11
// (at your option) any later version.
 
12
// 
 
13
// VoxBo is distributed in the hope that it will be useful, but
 
14
// WITHOUT ANY WARRANTY; without even the implied warranty of
 
15
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 
16
// General Public License for more details.
 
17
// 
 
18
// You should have received a copy of the GNU General Public License
 
19
// along with VoxBo.  If not, see <http://www.gnu.org/licenses/>.
 
20
// 
 
21
// For general information on VoxBo, including the latest complete
 
22
// source code and binary distributions, manual, and associated files,
 
23
// see the VoxBo home page at: http://www.voxbo.org/
 
24
//
 
25
// original version written by Dan Kimberg
 
26
 
 
27
using namespace std;
 
28
 
 
29
#include <sys/signal.h>
 
30
#include "vbutil.h"
 
31
#include "vbprefs.h"
 
32
#include "vbjobspec.h"
 
33
 
 
34
extern vector<VBHost> hostlist;
 
35
 
 
36
void process_vbx();
 
37
int process_jobrunning(int snum,int jnum,pid_t pid,pid_t childpid);
 
38
int process_setseqinfo(int ss,string newstatusinfo);
 
39
int process_setjobinfo(int ss,int jj,string newstatusinfo);
 
40
int process_email(string recipient,string fname);
 
41
int process_adminemail(string fname);
 
42
int process_hostupdate(string hh);
 
43
int process_missing(int snum,int jnum,pid_t pid);
 
44
void setjobinfo_update_hostlist(VBJobSpec newjs);
 
45
 
 
46
void
 
47
process_vbx()
 
48
{
 
49
  tokenlist line;
 
50
  vglob vg("*.vbx");
 
51
  for (size_t i=0; i<vg.size(); i++) {
 
52
    line.ParseFirstLine(vg[i]);
 
53
    if (line[0]=="setjobinfo")
 
54
      process_setjobinfo(strtol(line[1]),strtol(line[2]),line.Tail(3));
 
55
    else if (line[0]=="jobrunning")
 
56
      process_jobrunning(strtol(line[1]),strtol(line[2]),strtol(line[3]),strtol(line[4]));
 
57
    else if (line[0]=="setseqinfo")
 
58
      process_setseqinfo(strtol(line[1]),line.Tail(2));
 
59
    else if (line[0]=="email")
 
60
      process_email(line[1],gb.gl_pathv[i]);
 
61
    else if (line[0]=="adminemail")
 
62
      process_adminemail(gb.gl_pathv[i]);
 
63
    else if (line[0]=="missing")
 
64
      process_missing(strtol(line[1]),strtol(line[2]),strtol(line[3]));
 
65
    else if (line[0]=="hostupdate")
 
66
      process_hostupdate(line.Tail());
 
67
    else { 
 
68
      printf("[E] %s invalid vbx file %s\n",timedate().c_str(),vg[i].c_str());
 
69
    }
 
70
    unlink(vg[i].c_str());
 
71
    //rename(gb.gl_pathv[i],((string)"tmp/"+gb.gl_pathv[i]).c_str());
 
72
  }
 
73
}
 
74
 
 
75
int
 
76
process_missing(int snum,int jnum,pid_t pid)
 
77
{
 
78
  VBJobSpec *js=findjob(snum,jnum);
 
79
  if (!js)
 
80
    return 101;
 
81
  js->missingcount++;
 
82
  if (js->missingcount>2) {
 
83
    printf("[E] %s job %08d-%05d has gone missing (count %d) from host %s\n",
 
84
           timedate().c_str(),snum,jnum,js->missingcount,js->hostname.c_str());
 
85
    process_setjobinfo(snum,jnum,"status W");
 
86
  }
 
87
  return 0;
 
88
}
 
89
 
 
90
int
 
91
process_jobrunning(int snum,int jnum,pid_t pid,pid_t childpid)
 
92
{
 
93
  // set it in the main list
 
94
  process_setjobinfo(snum,jnum,(string)"pid "+strnum(pid));
 
95
  // set it in the running list and return
 
96
  for (HI h=hostlist.begin(); h!=hostlist.end(); h++) {
 
97
    for (JI j=h->speclist.begin(); j!=h->speclist.end(); j++) {
 
98
      if (snum==j->snum && jnum==j->jnum) {
 
99
        j->pid=pid;
 
100
        j->childpid=childpid;
 
101
        return 0;
 
102
      }
 
103
    }
 
104
  }
 
105
  // shouldn't happen unless a host disappears
 
106
  printf("[E] %s bad jobrunning message: %08d-%08d, %d %d\n",
 
107
         timedate().c_str(),snum,jnum,pid,childpid);
 
108
  return 1;
 
109
}
 
110
 
 
111
int
 
112
process_setjobinfo(int ss,int jj,string newstatusinfo)
 
113
{
 
114
  // update the file
 
115
  char jobfile[STRINGLEN];
 
116
  int err;
 
117
  sprintf(jobfile,"%08d/%05d.job",ss,jj);
 
118
  if ((err=appendline(jobfile,newstatusinfo))) {
 
119
    printf("[E] %s couldn't update job file %s (%d,%d,%s) [%d]\n",
 
120
           timedate().c_str(),jobfile,
 
121
           ss,jj,newstatusinfo.c_str(),err);
 
122
    return 101;
 
123
  }
 
124
  VBSequence seq(ss,jj);
 
125
  VBJobSpec *oldjob;
 
126
  // read the updated file
 
127
  if (seq.speclist.size()!=1) {
 
128
    printf("[E] %s couldn't read updated job file (%d,%d,%s)\n",timedate().c_str(),
 
129
           ss,jj,newstatusinfo.c_str());
 
130
    return 102;
 
131
  }
 
132
  // update it in hostlist (if running) and main list (regardless)
 
133
  if ((oldjob=findjob(ss,jj))) {
 
134
    if (oldjob->status=='R')
 
135
      setjobinfo_update_hostlist(seq.speclist[0]);
 
136
    *oldjob=seq.speclist[0];
 
137
  }
 
138
  else
 
139
    printf("[E] %s couldn't find job record (%d,%d,%s)\n",timedate().c_str(),
 
140
           ss,jj,newstatusinfo.c_str());
 
141
  return 0;
 
142
}
 
143
 
 
144
void
 
145
setjobinfo_update_hostlist(VBJobSpec newjs)
 
146
{
 
147
  // update or remove on hostlist
 
148
  for (HI h=hostlist.begin(); h!=hostlist.end(); h++) {
 
149
    for (JI j=h->speclist.begin(); j!=h->speclist.end(); j++) {
 
150
      if (newjs.snum==j->snum && newjs.jnum==j->jnum) {
 
151
        if (newjs.status!='R')
 
152
          h->speclist.erase(j);
 
153
        else
 
154
          *j=newjs;
 
155
        return;
 
156
      }
 
157
    }
 
158
  }
 
159
  printf("[E] %s couldn't update %08d-%05d on any host, even %s\n",
 
160
         timedate().c_str(),newjs.snum,newjs.jnum,newjs.hostname.c_str());
 
161
}
 
162
 
 
163
int
 
164
process_setseqinfo(int ss,string newstatusinfo)
 
165
{
 
166
  // update the file
 
167
  char seqfile[STRINGLEN];
 
168
  sprintf(seqfile,"%08d/%d.seq",ss,ss);
 
169
  if (appendline(seqfile,newstatusinfo)) {
 
170
    printf("[E] %s couldn't update sequence file\n",timedate().c_str());
 
171
    return 101;
 
172
  }
 
173
  VBSequence js,*oldseq;
 
174
  js.LoadSequence(xdirname(seqfile));
 
175
  // find a home for it
 
176
  if ((oldseq=findsequence(ss)))
 
177
    *oldseq=js;
 
178
  return 0;
 
179
}
 
180
 
 
181
int
 
182
process_hostupdate(string hh)
 
183
{
 
184
  string hostinfo,field;
 
185
 
 
186
  VBHost *tmph=NULL;
 
187
  tokenlist args,argx;
 
188
  args.SetQuoteChars("[<(\"'");
 
189
  argx.SetQuoteChars("[<(\"'");
 
190
  args.ParseLine(hh);
 
191
  // first argument better be full hostname
 
192
  argx.ParseLine(args[0]);
 
193
  if (argx[0]!="hostname") {
 
194
    printf("[E] %s malformed host update\n",timedate().c_str());
 
195
    return 101;
 
196
  }
 
197
  string hostname=argx[1];
 
198
  for (int i=0; i<(int)hostlist.size(); i++) {
 
199
    if (hostlist[i].hostname==hostname) {
 
200
      tmph=&(hostlist[i]);
 
201
    }
 
202
  }
 
203
  if (tmph==NULL) {
 
204
    printf("[E] %s invalid host update from %s\n",
 
205
           timedate().c_str(),hostname.c_str());
 
206
    return 101;
 
207
  }
 
208
 
 
209
  tmph->resources.clear();
 
210
  tmph->provides.clear();
 
211
  for (int i=1; i<args.size(); i++) {
 
212
    argx.ParseLine(args[i]);
 
213
    field=argx[0];
 
214
    if (field=="hostname")
 
215
      tmph->hostname=argx[1];
 
216
    else if (field=="shortname")
 
217
      tmph->shortname=argx[1];
 
218
    else if (field=="pri")
 
219
      tmph->currentpri=strtol(argx[1]);
 
220
    else if (field=="load")
 
221
      tmph->loadaverage=strtod(argx[1]);
 
222
    else if (field=="resource") {
 
223
      VBResource rr;
 
224
      rr.name=argx[1];
 
225
      rr.command=argx[2];
 
226
      rr.cnt=strtol(argx[3]);
 
227
      tmph->resources.push_back(rr);
 
228
    }
 
229
    else if (field=="provides") {
 
230
      VBResource rr;
 
231
      rr.type=argx[1];
 
232
      rr.name=argx[2];
 
233
      rr.cnt=strtol(argx[3]);
 
234
      tmph->provides.push_back(rr);
 
235
    }
 
236
    else if (field=="total_cpus")
 
237
      tmph->total_cpus=strtol(argx[1]);
 
238
    else if (field=="taken_cpus")
 
239
      tmph->taken_cpus=strtol(argx[1]);
 
240
    else if (field=="avail_cpus")
 
241
      tmph->avail_cpus=strtol(argx[1]);
 
242
    else if (field=="state")
 
243
      tmph->status=argx[1];
 
244
  }
 
245
  tmph->Update();
 
246
  tmph->lastresponse=time(NULL);
 
247
  tmph->lastupdate=time(NULL);
 
248
  return 0;
 
249
}
 
250