1
// This file is part of BOINC.
2
// http://boinc.berkeley.edu
3
// Copyright (C) 2008 University of California
5
// BOINC is free software; you can redistribute it and/or modify it
6
// under the terms of the GNU Lesser General Public License
7
// as published by the Free Software Foundation,
8
// either version 3 of the License, or (at your option) any later version.
10
// BOINC is distributed in the hope that it will be useful,
11
// but WITHOUT ANY WARRANTY; without even the implied warranty of
12
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
13
// See the GNU Lesser General Public License for more details.
15
// You should have received a copy of the GNU Lesser General Public License
16
// along with BOINC. If not, see <http://www.gnu.org/licenses/>.
18
// Matchmaker scheduling code
21
#include "error_numbers.h"
24
#include "sched_main.h"
25
#include "sched_config.h"
27
#include "sched_msgs.h"
28
#include "sched_shmem.h"
29
#include "sched_send.h"
30
#include "sched_version.h"
31
#include "sched_types.h"
33
#include "sched_score.h"
35
// reread result from DB, make sure it's still unsent
36
// TODO: from here to add_result_to_reply()
37
// (which updates the DB record) should be a transaction
39
int read_sendable_result(DB_RESULT& result) {
40
int retval = result.lookup_id(result.id);
42
log_messages.printf(MSG_CRITICAL,
43
"[RESULT#%d] result.lookup_id() failed %d\n",
48
if (result.server_state != RESULT_SERVER_STATE_UNSENT) {
49
log_messages.printf(MSG_NORMAL,
50
"[RESULT#%d] expected to be unsent; instead, state is %d\n",
51
result.id, result.server_state
53
return ERR_BAD_RESULT_STATE;
58
bool wu_is_infeasible_slow(
59
WU_RESULT& wu_result, SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply
66
// Don't send if we've already sent a result of this WU to this user.
68
if (config.one_result_per_user_per_wu) {
70
"where workunitid=%d and userid=%d",
71
wu_result.workunit.id, g_reply->user.id
73
retval = result.count(n, buf);
75
log_messages.printf(MSG_CRITICAL,
76
"send_work: can't get result count (%d)\n", retval
81
if (config.debug_send) {
82
log_messages.printf(MSG_NORMAL,
83
"[send] send_work: user %d already has %d result(s) for WU %d\n",
84
g_reply->user.id, n, wu_result.workunit.id
90
} else if (config.one_result_per_host_per_wu) {
91
// Don't send if we've already sent a result
92
// of this WU to this host.
93
// We only have to check this
94
// if we don't send one result per user.
97
"where workunitid=%d and hostid=%d",
98
wu_result.workunit.id, g_reply->host.id
100
retval = result.count(n, buf);
102
log_messages.printf(MSG_CRITICAL,
103
"send_work: can't get result count (%d)\n", retval
108
if (config.debug_send) {
109
log_messages.printf(MSG_NORMAL,
110
"[send] send_work: host %d already has %d result(s) for WU %d\n",
111
g_reply->host.id, n, wu_result.workunit.id
119
APP* app = ssp->lookup_app(wu_result.workunit.appid);
120
WORKUNIT wu = wu_result.workunit;
121
if (app_hr_type(*app)) {
122
if (already_sent_to_different_platform_careful(wu, *app)) {
123
if (config.debug_send) {
124
log_messages.printf(MSG_NORMAL,
125
"[send] [HOST#%d] [WU#%d %s] WU is infeasible (assigned to different platform)\n",
126
g_reply->host.id, wu.id, wu.name
129
// Mark the workunit as infeasible.
130
// This ensures that jobs already assigned to a platform
131
// are processed first.
133
wu_result.infeasible_count++;
140
double JOB_SET::lowest_score() {
141
if (jobs.empty()) return 0;
142
return jobs.back().score;
145
// add the given job, and remove lowest-score jobs that
146
// - are in excess of work request
147
// - are in excess of per-request or per-day limits
148
// - cause the disk limit to be exceeded
150
void JOB_SET::add_job(JOB& job) {
151
while (!jobs.empty()) {
152
JOB& worst_job = jobs.back();
153
if (est_time + job.est_time - worst_job.est_time > work_req) {
154
est_time -= worst_job.est_time;
155
disk_usage -= worst_job.disk_usage;
157
ssp->wu_results[worst_job.index].state = WR_STATE_PRESENT;
162
while (!jobs.empty()) {
163
JOB& worst_job = jobs.back();
164
if (disk_usage + job.disk_usage > disk_limit) {
165
est_time -= worst_job.est_time;
166
disk_usage -= worst_job.disk_usage;
168
ssp->wu_results[worst_job.index].state = WR_STATE_PRESENT;
174
if ((int)jobs.size() == max_jobs) {
175
JOB& worst_job = jobs.back();
177
ssp->wu_results[worst_job.index].state = WR_STATE_PRESENT;
180
std::list<JOB>::iterator i = jobs.begin();
181
while (i != jobs.end()) {
182
if (i->score < job.score) {
188
if (i == jobs.end()) {
191
est_time += job.est_time;
192
disk_usage += job.disk_usage;
193
if (config.debug_send) {
194
log_messages.printf(MSG_NORMAL,
195
"[send] added job to set. est_time %.2f disk_usage %.2fGB\n",
196
est_time, disk_usage/GIGA
201
// return the disk usage of jobs above the given score
203
double JOB_SET::higher_score_disk_usage(double v) {
205
std::list<JOB>::iterator i = jobs.begin();
206
while (i != jobs.end()) {
207
if (i->score < v) break;
208
sum += i->disk_usage;
214
void JOB_SET::send() {
219
std::list<JOB>::iterator i = jobs.begin();
220
while (i != jobs.end()) {
222
WU_RESULT wu_result = ssp->wu_results[job.index];
223
ssp->wu_results[job.index].state = WR_STATE_EMPTY;
224
wu = wu_result.workunit;
225
result.id = wu_result.resultid;
226
retval = read_sendable_result(result);
228
add_result_to_reply(result, wu, job.bavp, false);
233
void send_work_matchmaker() {
234
int i, slots_locked=0, slots_nonempty=0;
236
int min_slots = config.mm_min_slots;
237
if (!min_slots) min_slots = ssp->max_wu_results/2;
238
int max_slots = config.mm_max_slots;
239
if (!max_slots) max_slots = ssp->max_wu_results;
243
i = rand() % ssp->max_wu_results;
245
// scan through the job cache, maintaining a JOB_SET of jobs
246
// that we can send to this client, ordered by score.
248
for (int slots_scanned=0; slots_scanned<max_slots; slots_scanned++) {
249
i = (i+1) % ssp->max_wu_results;
250
WU_RESULT& wu_result = ssp->wu_results[i];
251
switch (wu_result.state) {
254
case WR_STATE_PRESENT:
259
if (wu_result.state == g_pid) break;
267
// get score for this job, and skip it if it fails quick check.
268
// NOTE: the EDF check done in get_score()
269
// includes only in-progress jobs.
271
if (!job.get_score()) {
274
if (config.debug_send) {
275
log_messages.printf(MSG_NORMAL,
276
"[send] score for %s: %f\n", wu_result.workunit.name, job.score
280
if (job.score > jobs.lowest_score() || !jobs.request_satisfied()) {
281
ssp->wu_results[i].state = g_pid;
283
if (wu_is_infeasible_slow(wu_result, *g_request, *g_reply)) {
284
// if we can't use this job, put it back in pool
287
ssp->wu_results[i].state = WR_STATE_PRESENT;
294
if (jobs.request_satisfied() && slots_scanned>=min_slots) break;
297
if (slots_nonempty) {
298
g_wreq->no_jobs_available = false;
300
log_messages.printf(MSG_CRITICAL,
301
"Job cache is empty - check feeder\n"
305
// TODO: trim jobs from tail of list until we pass the EDF check
309
if (slots_locked > max_locked) {
310
log_messages.printf(MSG_CRITICAL,
311
"Found too many locked slots (%d>%d) - increase array size\n",
312
slots_locked, max_locked