~ubuntu-branches/ubuntu/precise/boinc/precise

« back to all changes in this revision

Viewing changes to sched/sched_send.cpp

Tags: 6.12.8+dfsg-1
* New upstream release.
* Simplified debian/rules

Show diffs side-by-side

added added

removed removed

Lines of Context:
15
15
// You should have received a copy of the GNU Lesser General Public License
16
16
// along with BOINC.  If not, see <http://www.gnu.org/licenses/>.
17
17
 
18
 
// scheduler code related to sending jobs
 
18
// scheduler code related to sending jobs.
 
19
// NOTE: there should be nothing here specific to particular
 
20
// scheduling policies (array scan, matchmaking, locality)
19
21
 
20
22
#include "config.h"
21
23
#include <vector>
25
27
#include <cstdio>
26
28
#include <cstring>
27
29
#include <stdlib.h>
28
 
 
29
 
using namespace std;
30
 
 
 
30
#include <sys/time.h>
31
31
#include <unistd.h>
32
32
 
33
33
#include "error_numbers.h"
34
34
#include "parse.h"
35
35
#include "util.h"
36
36
#include "str_util.h"
 
37
#include "synch.h"
37
38
 
38
 
#include "server_types.h"
 
39
#include "credit.h"
 
40
#include "sched_types.h"
39
41
#include "sched_shmem.h"
40
42
#include "sched_config.h"
41
43
#include "sched_util.h"
42
 
#include "main.h"
 
44
#include "sched_main.h"
43
45
#include "sched_array.h"
44
46
#include "sched_msgs.h"
45
47
#include "sched_hr.h"
47
49
#include "sched_locality.h"
48
50
#include "sched_timezone.h"
49
51
#include "sched_assign.h"
50
 
#include "sched_plan.h"
 
52
#include "sched_customize.h"
 
53
#include "sched_version.h"
51
54
 
52
55
#include "sched_send.h"
53
56
 
55
58
#include "boinc_fcgi.h"
56
59
#endif
57
60
 
 
61
// if host sends us an impossible RAM size, use this instead
 
62
//
 
63
const double DEFAULT_RAM_SIZE = 64000000;
58
64
 
59
 
void send_work_matchmaker(SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply);
 
65
void send_work_matchmaker();
60
66
 
61
67
int preferred_app_message_index=0;
62
68
 
74
80
    return "Unknown";
75
81
}
76
82
 
77
 
const int MIN_SECONDS_TO_SEND = 0;
78
 
const int MAX_SECONDS_TO_SEND = (28*SECONDS_IN_DAY);
79
 
 
80
 
// return a number that
81
 
// - is the # of CPUs in EDF simulation
82
 
// - scales the daily result quota
83
 
// - scales max_wus_in_progress
84
 
 
85
 
inline int effective_ncpus(SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply) {
86
 
    int ncpus = reply.host.p_ncpus;
87
 
    if (ncpus > config.max_ncpus) ncpus = config.max_ncpus;
88
 
    if (ncpus < 1) ncpus = 1;
89
 
    if (config.have_cuda_apps) {
90
 
        COPROC* cp = sreq.coprocs.lookup("cuda");
91
 
        if (cp && cp->count > ncpus) {
92
 
            ncpus = cp->count;
93
 
        }
94
 
    }
95
 
    return ncpus;
96
 
}
97
 
 
98
 
const double DEFAULT_RAM_SIZE = 64000000;
99
 
    // if host sends us an impossible RAM size, use this instead
100
 
 
101
 
bool SCHEDULER_REQUEST::has_version(APP& app) {
102
 
    unsigned int i;
103
 
 
104
 
    for (i=0; i<client_app_versions.size(); i++) {
105
 
        CLIENT_APP_VERSION& cav = client_app_versions[i];
106
 
        if (!strcmp(cav.app_name, app.name) && cav.version_num >= app.min_version) {
107
 
            return true;
108
 
        }
109
 
    }
110
 
    return false;
111
 
}
112
 
 
113
 
// return BEST_APP_VERSION for the given host, or NULL if none
114
 
//
115
 
//
116
 
BEST_APP_VERSION* get_app_version(
117
 
    SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, WORKUNIT& wu
118
 
) {
119
 
    bool found;
120
 
    unsigned int i;
121
 
    int j;
122
 
    BEST_APP_VERSION* bavp;
123
 
    char message[256];
124
 
 
125
 
    //
126
 
    // see if app is already in memoized array
127
 
    //
128
 
    for (i=0; i<reply.wreq.best_app_versions.size(); i++) {
129
 
        bavp = reply.wreq.best_app_versions[i];
130
 
        if (bavp->appid == wu.appid) {
131
 
            if (!bavp->avp) return NULL;
132
 
            return bavp;
133
 
        }
134
 
    }
135
 
 
136
 
    APP* app = ssp->lookup_app(wu.appid);
137
 
    if (!app) {
138
 
        log_messages.printf(MSG_CRITICAL, "WU refers to nonexistent app: %d\n", wu.appid);
139
 
        return NULL;
140
 
    }
141
 
 
142
 
    bavp = new BEST_APP_VERSION;
143
 
    bavp->appid = wu.appid;
144
 
    if (anonymous(sreq.platforms.list[0])) {
145
 
        found = sreq.has_version(*app);
146
 
        if (!found) {
147
 
            if (config.debug_send) {
148
 
                log_messages.printf(MSG_DEBUG,
149
 
                    "Didn't find anonymous platform app for %s\n", app->name
150
 
                );
151
 
                sprintf(message,
152
 
                    "Your app_info.xml file doesn't have a version of %s.",
153
 
                    app->user_friendly_name
154
 
                );
155
 
                USER_MESSAGE um(message, "high");
156
 
                reply.wreq.insert_no_work_message(um);
157
 
                reply.wreq.no_app_version = true;
158
 
            }
159
 
            bavp->avp = 0;
160
 
        } else {
161
 
            if (config.debug_send) {
162
 
                log_messages.printf(MSG_DEBUG,
163
 
                    "Found anonymous platform app for %s\n", app->name
164
 
                );
165
 
            }
166
 
            // TODO: anonymous platform apps should be able to tell us
167
 
            // how fast they are and how many CPUs and coprocs they use.
168
 
            // For now, assume they use 1 CPU
169
 
            //
170
 
            bavp->host_usage.sequential_app(reply.host.p_fpops);
171
 
            bavp->avp = (APP_VERSION*)1;    // arbitrary nonzero value;
172
 
                // means the client already has the app version
173
 
        }
174
 
        reply.wreq.best_app_versions.push_back(bavp);
175
 
        if (!bavp->avp) return NULL;
176
 
        return bavp;
177
 
    }
178
 
 
179
 
    // Go through the client's platforms.
180
 
    // Scan the app versions for each platform.
181
 
    // Find the one with highest expected FLOPS
182
 
    //
183
 
    bavp->host_usage.flops = 0;
184
 
    bavp->avp = NULL;
185
 
    for (i=0; i<sreq.platforms.list.size(); i++) {
186
 
        PLATFORM* p = sreq.platforms.list[i];
187
 
        for (j=0; j<ssp->napp_versions; j++) {
188
 
            HOST_USAGE host_usage;
189
 
            APP_VERSION& av = ssp->app_versions[j];
190
 
            if (av.appid != wu.appid) continue;
191
 
            if (av.platformid != p->id) continue;
192
 
            if (sreq.core_client_version < av.min_core_version) {
193
 
                log_messages.printf(MSG_NORMAL,
194
 
                    "outdated client version %d < min core version %d\n",
195
 
                    sreq.core_client_version, av.min_core_version
196
 
                );
197
 
                reply.wreq.outdated_core = true;
198
 
                continue;
199
 
            }
200
 
            if (strlen(av.plan_class)) {
201
 
                if (!sreq.client_cap_plan_class) continue;
202
 
                if (!app_plan(sreq, av.plan_class, host_usage)) {
203
 
                    continue;
204
 
                }
205
 
            } else {
206
 
                host_usage.sequential_app(reply.host.p_fpops);
207
 
            }
208
 
            if (host_usage.flops > bavp->host_usage.flops) {
209
 
                bavp->host_usage = host_usage;
210
 
                bavp->avp = &av;
211
 
            }
212
 
        }
213
 
    }
214
 
    reply.wreq.best_app_versions.push_back(bavp);
215
 
    if (bavp->avp) {
216
 
        if (config.debug_version_select) {
217
 
            log_messages.printf(MSG_DEBUG,
218
 
                "Best version of app %s is %d (%.2f GFLOPS)\n",
219
 
                app->name, bavp->avp->id, bavp->host_usage.flops/1e9
220
 
            );
221
 
        }
 
83
const double MIN_REQ_SECS = 0;
 
84
const double MAX_REQ_SECS = (28*SECONDS_IN_DAY);
 
85
 
 
86
const int MAX_GPUS = 8;
 
87
    // don't believe clients who claim they have more GPUs than this
 
88
 
 
89
// get limits on:
 
90
// # jobs per day
 
91
// # jobs per RPC
 
92
// # jobs in progress
 
93
//
 
94
void WORK_REQ::get_job_limits() {
 
95
    int n;
 
96
    n = g_reply->host.p_ncpus;
 
97
    if (g_request->global_prefs.max_ncpus_pct && g_request->global_prefs.max_ncpus_pct < 100) {
 
98
        n = (int)((n*g_request->global_prefs.max_ncpus_pct)/100.);
 
99
    }
 
100
    if (n > config.max_ncpus) n = config.max_ncpus;
 
101
    if (n < 1) n = 1;
 
102
    effective_ncpus = n;
 
103
 
 
104
    n = g_request->coprocs.cuda.count + g_request->coprocs.ati.count;
 
105
    if (n > MAX_GPUS) n = MAX_GPUS;
 
106
    effective_ngpus = n;
 
107
 
 
108
    int mult = effective_ncpus + config.gpu_multiplier * effective_ngpus;
 
109
    if (config.non_cpu_intensive) {
 
110
        mult = 1;
 
111
        effective_ncpus = 1;
 
112
        if (effective_ngpus) effective_ngpus = 1;
 
113
    }
 
114
 
 
115
    if (config.max_wus_to_send) {
 
116
        g_wreq->max_jobs_per_rpc = mult * config.max_wus_to_send;
222
117
    } else {
223
 
        // here if no app version exists
224
 
        //
225
 
        if (config.debug_version_select) {
226
 
            log_messages.printf(MSG_DEBUG,
227
 
                "no app version available: APP#%d PLATFORM#%d min_version %d\n",
228
 
                app->id, sreq.platforms.list[0]->id, app->min_version
229
 
            );
230
 
        }
231
 
        sprintf(message,
232
 
            "%s is not available for your type of computer.",
233
 
            app->user_friendly_name
 
118
        g_wreq->max_jobs_per_rpc = 999999;
 
119
    }
 
120
 
 
121
    config.max_jobs_in_progress.reset(g_reply->host, g_request->coprocs);
 
122
 
 
123
    if (config.debug_quota) {
 
124
        log_messages.printf(MSG_NORMAL,
 
125
            "[quota] max jobs per RPC: %d\n",
 
126
            g_wreq->max_jobs_per_rpc
234
127
        );
235
 
        USER_MESSAGE um(message, "high");
236
 
        reply.wreq.insert_no_work_message(um);
237
 
        reply.wreq.no_app_version = true;
238
 
        return NULL;
 
128
        config.max_jobs_in_progress.print_log();
239
129
    }
240
 
    return bavp;
241
130
}
242
131
 
243
132
static const char* find_user_friendly_name(int appid) {
244
 
        APP* app = ssp->lookup_app(appid);
245
 
        if (app) return app->user_friendly_name;
 
133
    APP* app = ssp->lookup_app(appid);
 
134
    if (app) return app->user_friendly_name;
246
135
    return "deprecated application";
247
136
}
248
137
 
252
141
// - d_total and d_free (pre 4 oct 2005)
253
142
// - the above plus d_boinc_used_total and d_boinc_used_project
254
143
//
255
 
double max_allowable_disk(SCHEDULER_REQUEST& req, SCHEDULER_REPLY& reply) {
256
 
    HOST host = req.host;
257
 
    GLOBAL_PREFS prefs = req.global_prefs;
 
144
double max_allowable_disk() {
 
145
    HOST host = g_request->host;
 
146
    GLOBAL_PREFS prefs = g_request->global_prefs;
258
147
    double x1, x2, x3, x;
259
148
 
260
149
    // defaults are from config.xml
285
174
        x2 = host.d_total*prefs.disk_max_used_pct/100.
286
175
            - host.d_boinc_used_total;
287
176
        x3 = host.d_free - prefs.disk_min_free_gb*GIGA;      // may be negative
288
 
        x = min(x1, min(x2, x3));
 
177
        x = std::min(x1, std::min(x2, x3));
289
178
 
290
179
        // see which bound is the most stringent
291
180
        //
292
181
        if (x==x1) {
293
 
            reply.disk_limits.max_used = x;
 
182
            g_reply->disk_limits.max_used = x;
294
183
        } else if (x==x2) {
295
 
            reply.disk_limits.max_frac = x;
 
184
            g_reply->disk_limits.max_frac = x;
296
185
        } else {
297
 
            reply.disk_limits.min_free = x;
 
186
            g_reply->disk_limits.min_free = x;
298
187
        }
299
188
    } else {
300
189
        // here we don't know how much space BOINC is using.
304
193
        // We can only honor the min_free pref.
305
194
        //
306
195
        x = host.d_free - prefs.disk_min_free_gb*GIGA;      // may be negative
307
 
        reply.disk_limits.min_free = x;
 
196
        g_reply->disk_limits.min_free = x;
308
197
        x1 = x2 = x3 = 0;
309
198
    }
310
199
 
311
200
    if (x < 0) {
312
201
        if (config.debug_send) {
313
 
            log_messages.printf(MSG_DEBUG,
314
 
                "Insufficient disk: disk_max_used_gb %f disk_max_used_pct %f disk_min_free_gb %f\n",
315
 
                prefs.disk_max_used_gb, prefs.disk_max_used_pct,
316
 
                prefs.disk_min_free_gb
317
 
            );
318
 
            log_messages.printf(MSG_DEBUG,
319
 
                "Insufficient disk: host.d_total %f host.d_free %f host.d_boinc_used_total %f\n",
320
 
                host.d_total, host.d_free, host.d_boinc_used_total
321
 
            );
322
 
            log_messages.printf(MSG_DEBUG,
323
 
                "Insufficient disk: x1 %f x2 %f x3 %f x %f\n",
324
 
                x1, x2, x3, x
 
202
            log_messages.printf(MSG_NORMAL,
 
203
                "[send] No disk space available: disk_max_used_gb %.2fGB disk_max_used_pct %.2f disk_min_free_gb %.2fGB\n",
 
204
                prefs.disk_max_used_gb/GIGA,
 
205
                prefs.disk_max_used_pct,
 
206
                prefs.disk_min_free_gb/GIGA
 
207
            );
 
208
            log_messages.printf(MSG_NORMAL,
 
209
                "[send] No disk space available: host.d_total %.2fGB host.d_free %.2fGB host.d_boinc_used_total %.2fGB\n",
 
210
                host.d_total/GIGA,
 
211
                host.d_free/GIGA,
 
212
                host.d_boinc_used_total/GIGA
 
213
            );
 
214
            log_messages.printf(MSG_NORMAL,
 
215
                "[send] No disk space available: x1 %.2fGB x2 %.2fGB x3 %.2fGB x %.2fGB\n",
 
216
                x1/GIGA, x2/GIGA, x3/GIGA, x/GIGA
325
217
            );
326
218
        }
327
 
        reply.wreq.disk.set_insufficient(-x);
 
219
        g_wreq->disk.set_insufficient(-x);
 
220
        x = 0;
328
221
    }
329
222
    return x;
330
223
}
331
224
 
332
 
// if a host has active_frac < 0.1, assume 0.1 so we don't deprive it of work.
333
 
//
334
 
const double HOST_ACTIVE_FRAC_MIN = 0.1;
335
 
 
336
 
// estimate the number of CPU seconds that a workunit requires
337
 
// running on this host.
338
 
//
339
 
double estimate_cpu_duration(WORKUNIT& wu, SCHEDULER_REPLY& reply) {
340
 
    double p_fpops = reply.host.p_fpops;
341
 
    if (p_fpops <= 0) p_fpops = 1e9;
 
225
static double estimate_duration_unscaled(WORKUNIT& wu, BEST_APP_VERSION& bav) {
342
226
    double rsc_fpops_est = wu.rsc_fpops_est;
343
227
    if (rsc_fpops_est <= 0) rsc_fpops_est = 1e12;
344
 
    return rsc_fpops_est/p_fpops;
 
228
    return rsc_fpops_est/bav.host_usage.projected_flops;
 
229
}
 
230
 
 
231
static inline void get_running_frac() {
 
232
    double rf;
 
233
    if (g_request->core_client_version<=41900) {
 
234
        rf = g_reply->host.on_frac;
 
235
    } else {
 
236
        rf = g_reply->host.active_frac * g_reply->host.on_frac;
 
237
    }
 
238
 
 
239
    // clamp running_frac to a reasonable range
 
240
    //
 
241
    if (rf > 1) {
 
242
        if (config.debug_send) {
 
243
            log_messages.printf(MSG_NORMAL, "[send] running_frac=%f; setting to 1\n", rf);
 
244
        }
 
245
        rf = 1;
 
246
    } else if (rf < .1) {
 
247
        if (config.debug_send) {
 
248
            log_messages.printf(MSG_NORMAL, "[send] running_frac=%f; setting to 0.1\n", rf);
 
249
        }
 
250
        rf = .1;
 
251
    }
 
252
    g_wreq->running_frac = rf;
345
253
}
346
254
 
347
255
// estimate the amount of real time to complete this WU,
348
256
// taking into account active_frac etc.
349
257
// Note: don't factor in resource_share_fraction.
350
 
// The core client no longer necessarily does round-robin
351
 
// across all projects.
 
258
// The core client doesn't necessarily round-robin across all projects.
352
259
//
353
 
static double estimate_wallclock_duration(
354
 
    WORKUNIT& wu, SCHEDULER_REQUEST&, SCHEDULER_REPLY& reply
355
 
) {
356
 
    double ecd = estimate_cpu_duration(wu, reply);
357
 
    double ewd = ecd/reply.wreq.running_frac;
358
 
    if (reply.host.duration_correction_factor) {
359
 
        ewd *= reply.host.duration_correction_factor;
360
 
    }
 
260
double estimate_duration(WORKUNIT& wu, BEST_APP_VERSION& bav) {
 
261
    double edu = estimate_duration_unscaled(wu, bav);
 
262
    double ed = edu/g_wreq->running_frac;
361
263
    if (config.debug_send) {
362
 
        log_messages.printf(MSG_DEBUG,
363
 
            "est cpu dur %f;  est wall dur %f\n", ecd, ewd
 
264
        log_messages.printf(MSG_NORMAL,
 
265
            "[send] est. duration for WU %d: unscaled %.2f scaled %.2f\n",
 
266
            wu.id, edu, ed
364
267
        );
365
268
    }
366
 
    return ewd;
 
269
    return ed;
367
270
}
368
271
 
369
 
// Find or compute various info about the host;
370
 
// this info affects which jobs are sent to the host.
371
 
//
372
 
static int get_host_info(SCHEDULER_REPLY& reply) {
 
272
static void get_prefs_info() {
373
273
    char buf[8096];
374
 
    string str;
 
274
    std::string str;
375
275
    unsigned int pos = 0;
376
 
    int temp_int;
 
276
    int temp_int=0;
377
277
    bool flag;
378
278
 
379
 
    extract_venue(reply.user.project_prefs, reply.host.venue, buf);
 
279
    extract_venue(g_reply->user.project_prefs, g_reply->host.venue, buf);
380
280
    str = buf;
381
281
 
382
282
    // scan user's project prefs for elements of the form <app_id>N</app_id>,
383
283
    // indicating the apps they want to run.
384
284
    //
385
 
    reply.wreq.host_info.preferred_apps.clear();
 
285
    g_wreq->preferred_apps.clear();
386
286
    while (parse_int(str.substr(pos,str.length()-pos).c_str(), "<app_id>", temp_int)) {
387
287
        APP_INFO ai;
388
288
        ai.appid = temp_int;
389
289
        ai.work_available = false;
390
 
        reply.wreq.host_info.preferred_apps.push_back(ai);
 
290
        g_wreq->preferred_apps.push_back(ai);
391
291
 
392
292
        pos = str.find("<app_id>", pos) + 1;
393
293
    }
394
 
        if (parse_bool(buf,"allow_non_preferred_apps", flag)) {
395
 
            reply.wreq.host_info.allow_non_preferred_apps = flag;
396
 
    }
397
 
        if (parse_bool(buf,"allow_beta_work", flag)) {
398
 
        reply.wreq.host_info.allow_beta_work = flag;
399
 
        }
400
 
 
401
 
    // Decide whether or not this computer is 'reliable'
402
 
    // A computer is reliable if the following conditions are true
403
 
    // (for those that are set in the config file)
404
 
    // 1) The host average turnaround is less than the config
405
 
    // max average turnaround
406
 
    // 2) The host error rate is less then the config max error rate
407
 
    // 3) The host results per day is equal to the config file value
408
 
    //
409
 
    double expavg_credit = reply.host.expavg_credit;
410
 
    double expavg_time = reply.host.expavg_time;
411
 
    update_average(0, 0, CREDIT_HALF_LIFE, expavg_credit, expavg_time);
412
 
 
413
 
        // Platforms other then Windows, Linux and Intel Macs need a
 
294
    if (parse_bool(buf,"allow_non_preferred_apps", flag)) {
 
295
        g_wreq->allow_non_preferred_apps = flag;
 
296
    }
 
297
    if (parse_bool(buf,"allow_beta_work", flag)) {
 
298
        g_wreq->allow_beta_work = flag;
 
299
    }
 
300
    if (parse_bool(buf,"no_gpus", flag)) {
 
301
        // deprecated, but need to handle
 
302
        if (flag) {
 
303
            g_wreq->no_cuda = true;
 
304
            g_wreq->no_ati = true;
 
305
        }
 
306
    }
 
307
    if (parse_bool(buf,"no_cpu", flag)) {
 
308
        g_wreq->no_cpu = flag;
 
309
    }
 
310
    if (parse_bool(buf,"no_cuda", flag)) {
 
311
        g_wreq->no_cuda = flag;
 
312
    }
 
313
    if (parse_bool(buf,"no_ati", flag)) {
 
314
        g_wreq->no_ati = flag;
 
315
    }
 
316
}
 
317
 
 
318
// Decide whether or not this app version is 'reliable'
 
319
// An app version is reliable if the following conditions are true
 
320
// (for those that are set in the config file)
 
321
// 1) The host average turnaround is less than a threshold
 
322
// 2) consecutive_valid is above a threshold
 
323
// 3) The host results per day is equal to the max value
 
324
//
 
325
void get_reliability_version(HOST_APP_VERSION& hav, double multiplier) {
 
326
    if (hav.turnaround.n > MIN_HOST_SAMPLES && config.reliable_max_avg_turnaround) {
 
327
 
 
328
        if (hav.turnaround.get_avg() > config.reliable_max_avg_turnaround*multiplier) {
 
329
            if (config.debug_send) {
 
330
                log_messages.printf(MSG_NORMAL,
 
331
                    "[send] [AV#%d] not reliable; avg turnaround: %.3f > %.3f hrs\n",
 
332
                    hav.app_version_id,
 
333
                    hav.turnaround.get_avg()/3600,
 
334
                    config.reliable_max_avg_turnaround*multiplier/3600
 
335
                );
 
336
            }
 
337
            hav.reliable = false;
 
338
            return;
 
339
        }
 
340
    }
 
341
    if (hav.consecutive_valid < CONS_VALID_RELIABLE) {
 
342
        if (config.debug_send) {
 
343
            log_messages.printf(MSG_NORMAL,
 
344
                "[send] [AV#%d] not reliable; cons valid %d < %d\n",
 
345
                hav.app_version_id,
 
346
                hav.consecutive_valid, CONS_VALID_RELIABLE
 
347
            );
 
348
        }
 
349
        hav.reliable = false;
 
350
        return;
 
351
    }
 
352
    if (config.daily_result_quota) {
 
353
        if (hav.max_jobs_per_day < config.daily_result_quota) {
 
354
            if (config.debug_send) {
 
355
                log_messages.printf(MSG_NORMAL,
 
356
                    "[send] [AV#%d] not reliable; max_jobs_per_day %d>%d\n",
 
357
                    hav.app_version_id,
 
358
                    hav.max_jobs_per_day,
 
359
                    config.daily_result_quota
 
360
                );
 
361
            }
 
362
            hav.reliable = false;
 
363
            return;
 
364
        }
 
365
    }
 
366
    hav.reliable = true;
 
367
    if (config.debug_send) {
 
368
        log_messages.printf(MSG_NORMAL,
 
369
            "[send] [HOST#%d] app version %d is reliable\n",
 
370
            g_reply->host.id, hav.app_version_id
 
371
        );
 
372
    }
 
373
    g_wreq->has_reliable_version = true;
 
374
}
 
375
 
 
376
// decide whether do unreplicated jobs with this app version
 
377
//
 
378
static void set_trust(DB_HOST_APP_VERSION& hav) {
 
379
    hav.trusted = false;
 
380
    if (hav.consecutive_valid < CONS_VALID_UNREPLICATED) {
 
381
        if (config.debug_send) {
 
382
            log_messages.printf(MSG_NORMAL,
 
383
                "[send] set_trust: cons valid %d < %d, don't use single replication\n",
 
384
                hav.consecutive_valid, CONS_VALID_UNREPLICATED
 
385
            );
 
386
        }
 
387
        return;
 
388
    }
 
389
    double x = 1./hav.consecutive_valid;
 
390
    if (drand() > x) hav.trusted = true;
 
391
    if (config.debug_send) {
 
392
        log_messages.printf(MSG_NORMAL,
 
393
            "[send] set_trust: random choice for cons valid %d: %s\n",
 
394
            hav.consecutive_valid, hav.trusted?"yes":"no"
 
395
        );
 
396
    }
 
397
}
 
398
 
 
399
static void update_quota(DB_HOST_APP_VERSION& hav) {
 
400
    if (config.daily_result_quota) {
 
401
        if (hav.max_jobs_per_day == 0) {
 
402
            hav.max_jobs_per_day = config.daily_result_quota;
 
403
            if (config.debug_quota) {
 
404
                log_messages.printf(MSG_NORMAL,
 
405
                    "[quota] [HAV#%d] Initializing max_results_day to %d\n",
 
406
                    hav.app_version_id,
 
407
                    config.daily_result_quota
 
408
                );
 
409
            }
 
410
        }
 
411
    }
 
412
 
 
413
    if (g_request->last_rpc_dayofyear != g_request->current_rpc_dayofyear) {
 
414
        if (config.debug_quota) {
 
415
            log_messages.printf(MSG_NORMAL,
 
416
                "[quota] [HOST#%d] [HAV#%d] Resetting n_jobs_today\n",
 
417
                g_reply->host.id, hav.app_version_id
 
418
            );
 
419
        }
 
420
        hav.n_jobs_today = 0;
 
421
    }
 
422
}
 
423
 
 
424
void update_n_jobs_today() {
 
425
    for (unsigned int i=0; i<g_wreq->host_app_versions.size(); i++) {
 
426
        DB_HOST_APP_VERSION& hav = g_wreq->host_app_versions[i];
 
427
        update_quota(hav);
 
428
    }
 
429
}
 
430
 
 
431
static void get_reliability_and_trust() {
 
432
    // Platforms other than Windows, Linux and Intel Macs need a
414
433
    // larger set of computers to be marked reliable
415
434
    //
416
435
    double multiplier = 1.0;
417
 
    if (strstr(reply.host.os_name,"Windows")
418
 
        || strstr(reply.host.os_name,"Linux")
419
 
        || (strstr(reply.host.os_name,"Darwin")
420
 
            && !(strstr(reply.host.p_vendor,"Power Macintosh"))
 
436
    if (strstr(g_reply->host.os_name,"Windows")
 
437
        || strstr(g_reply->host.os_name,"Linux")
 
438
        || (strstr(g_reply->host.os_name,"Darwin")
 
439
            && !(strstr(g_reply->host.p_vendor,"Power Macintosh"))
421
440
    )) {
422
 
        multiplier = 1.0;
 
441
        multiplier = 1.0;
423
442
    } else {
424
 
        multiplier = 1.8;
 
443
        multiplier = 1.8;
425
444
    }
426
445
 
427
 
    if ((config.reliable_max_avg_turnaround == 0 || reply.host.avg_turnaround < config.reliable_max_avg_turnaround*multiplier)
428
 
        && (config.reliable_max_error_rate == 0 || reply.host.error_rate < config.reliable_max_error_rate*multiplier)
429
 
        && (config.daily_result_quota == 0 || reply.host.max_results_day >= config.daily_result_quota)
430
 
     ) {
431
 
        reply.wreq.host_info.reliable = true;
432
 
    }
433
 
    if (config.debug_send) {
434
 
        log_messages.printf(MSG_DEBUG,
435
 
            "[HOST#%d] is%s reliable (OS = %s) error_rate = %.6f avg_turn_hrs = %.3f \n",
436
 
            reply.host.id,
437
 
            reply.wreq.host_info.reliable?"":" not",
438
 
            reply.host.os_name, reply.host.error_rate,
439
 
            reply.host.avg_turnaround/3600
440
 
        );
441
 
    }
442
 
    return 0;
 
446
    for (unsigned int i=0; i<g_wreq->host_app_versions.size(); i++) {
 
447
        DB_HOST_APP_VERSION& hav = g_wreq->host_app_versions[i];
 
448
        get_reliability_version(hav, multiplier);
 
449
        set_trust(hav);
 
450
    }
443
451
}
444
452
 
445
453
// Return true if the user has set application preferences,
446
454
// and this job is not for a selected app
447
455
//
448
 
bool app_not_selected(
449
 
    WORKUNIT& wu, SCHEDULER_REQUEST& , SCHEDULER_REPLY& reply
450
 
) {
 
456
bool app_not_selected(WORKUNIT& wu) {
451
457
    unsigned int i;
452
458
 
453
 
    if (reply.wreq.host_info.preferred_apps.size() == 0) return false;
454
 
    for (i=0; i<reply.wreq.host_info.preferred_apps.size(); i++) {
455
 
        if (wu.appid == reply.wreq.host_info.preferred_apps[i].appid) {
456
 
            reply.wreq.host_info.preferred_apps[i].work_available = true;
 
459
    if (g_wreq->preferred_apps.size() == 0) return false;
 
460
    for (i=0; i<g_wreq->preferred_apps.size(); i++) {
 
461
        if (wu.appid == g_wreq->preferred_apps[i].appid) {
 
462
            g_wreq->preferred_apps[i].work_available = true;
457
463
            return false;
458
464
        }
459
465
    }
461
467
}
462
468
 
463
469
// see how much RAM we can use on this machine
464
 
// TODO: compute this once, not once per job
465
470
//
466
 
static inline void get_mem_sizes(
467
 
    SCHEDULER_REQUEST& request, SCHEDULER_REPLY& reply,
468
 
    double& ram, double& usable_ram
469
 
) {
470
 
    ram = reply.host.m_nbytes;
471
 
    if (ram <= 0) ram = DEFAULT_RAM_SIZE;
472
 
    usable_ram = ram;
473
 
    double busy_frac = request.global_prefs.ram_max_used_busy_frac;
474
 
    double idle_frac = request.global_prefs.ram_max_used_idle_frac;
 
471
static inline void get_mem_sizes() {
 
472
    g_wreq->ram = g_reply->host.m_nbytes;
 
473
    if (g_wreq->ram <= 0) g_wreq->ram = DEFAULT_RAM_SIZE;
 
474
    g_wreq->usable_ram = g_wreq->ram;
 
475
    double busy_frac = g_request->global_prefs.ram_max_used_busy_frac;
 
476
    double idle_frac = g_request->global_prefs.ram_max_used_idle_frac;
475
477
    double frac = 1;
476
478
    if (busy_frac>0 && idle_frac>0) {
477
479
        frac = std::max(busy_frac, idle_frac);
478
480
        if (frac > 1) frac = 1;
479
 
        usable_ram *= frac;
 
481
        g_wreq->usable_ram *= frac;
480
482
    }
481
483
}
482
484
 
483
 
static inline int check_memory(
484
 
    WORKUNIT& wu, SCHEDULER_REQUEST& request, SCHEDULER_REPLY& reply
485
 
) {
486
 
    double ram, usable_ram;
487
 
    get_mem_sizes(request, reply, ram, usable_ram);
488
 
 
489
 
    double diff = wu.rsc_memory_bound - usable_ram;
 
485
static inline int check_memory(WORKUNIT& wu) {
 
486
    double diff = wu.rsc_memory_bound - g_wreq->usable_ram;
490
487
    if (diff > 0) {
491
488
        char message[256];
492
489
        sprintf(message,
493
490
            "%s needs %0.2f MB RAM but only %0.2f MB is available for use.",
494
491
            find_user_friendly_name(wu.appid),
495
 
            wu.rsc_memory_bound/MEGA, usable_ram/MEGA
 
492
            wu.rsc_memory_bound/MEGA, g_wreq->usable_ram/MEGA
496
493
        );
497
 
        USER_MESSAGE um(message,"high");
498
 
        reply.wreq.insert_no_work_message(um);
499
 
        
 
494
        add_no_work_message(message);
 
495
 
500
496
        if (config.debug_send) {
501
 
            log_messages.printf(MSG_DEBUG,
502
 
                "[WU#%d %s] needs %0.2fMB RAM; [HOST#%d] has %0.2fMB, %0.2fMB usable\n",
 
497
            log_messages.printf(MSG_NORMAL,
 
498
                "[send] [WU#%d %s] needs %0.2fMB RAM; [HOST#%d] has %0.2fMB, %0.2fMB usable\n",
503
499
                wu.id, wu.name, wu.rsc_memory_bound/MEGA,
504
 
                reply.host.id, ram/MEGA, usable_ram/MEGA
 
500
                g_reply->host.id, g_wreq->ram/MEGA, g_wreq->usable_ram/MEGA
505
501
            );
506
502
        }
507
 
        reply.wreq.mem.set_insufficient(wu.rsc_memory_bound);
508
 
        reply.set_delay(DELAY_NO_WORK_TEMP);
 
503
        g_wreq->mem.set_insufficient(wu.rsc_memory_bound);
 
504
        g_reply->set_delay(DELAY_NO_WORK_TEMP);
509
505
        return INFEASIBLE_MEM;
510
506
    }
511
507
    return 0;
512
508
}
513
509
 
514
 
static inline int check_disk(
515
 
    WORKUNIT& wu, SCHEDULER_REQUEST& , SCHEDULER_REPLY& reply
516
 
) {
517
 
    double diff = wu.rsc_disk_bound - reply.wreq.disk_available;
 
510
static inline int check_disk(WORKUNIT& wu) {
 
511
    double diff = wu.rsc_disk_bound - g_wreq->disk_available;
518
512
    if (diff > 0) {
519
513
        char message[256];
520
514
        sprintf(message,
521
515
            "%s needs %0.2fMB more disk space.  You currently have %0.2f MB available and it needs %0.2f MB.",
522
516
            find_user_friendly_name(wu.appid),
523
 
            diff/MEGA, reply.wreq.disk_available/MEGA, wu.rsc_disk_bound/MEGA
 
517
            diff/MEGA, g_wreq->disk_available/MEGA, wu.rsc_disk_bound/MEGA
524
518
        );
525
 
        USER_MESSAGE um(message,"high");
526
 
        reply.wreq.insert_no_work_message(um);
 
519
        add_no_work_message(message);
527
520
 
528
 
        reply.wreq.disk.set_insufficient(diff);
 
521
        g_wreq->disk.set_insufficient(diff);
529
522
        return INFEASIBLE_DISK;
530
523
    }
531
524
    return 0;
532
525
}
533
526
 
534
 
static inline int check_bandwidth(
535
 
    WORKUNIT& wu, SCHEDULER_REQUEST& , SCHEDULER_REPLY& reply
536
 
) {
 
527
static inline int check_bandwidth(WORKUNIT& wu) {
537
528
    if (wu.rsc_bandwidth_bound == 0) return 0;
538
 
    
 
529
 
539
530
    // if n_bwdown is zero, the host has never downloaded anything,
540
531
    // so skip this check
541
532
    //
542
 
    if (reply.host.n_bwdown == 0) return 0;
 
533
    if (g_reply->host.n_bwdown == 0) return 0;
543
534
 
544
 
    double diff = wu.rsc_bandwidth_bound - reply.host.n_bwdown;
 
535
    double diff = wu.rsc_bandwidth_bound - g_reply->host.n_bwdown;
545
536
    if (diff > 0) {
546
537
        char message[256];
547
538
        sprintf(message,
548
539
            "%s requires %0.2f KB/sec download bandwidth.  Your computer has been measured at %0.2f KB/sec.",
549
540
            find_user_friendly_name(wu.appid),
550
 
            wu.rsc_bandwidth_bound/KILO, reply.host.n_bwdown/KILO
 
541
            wu.rsc_bandwidth_bound/KILO, g_reply->host.n_bwdown/KILO
551
542
        );
552
 
        USER_MESSAGE um(message,"high");
553
 
        reply.wreq.insert_no_work_message(um);
 
543
        add_no_work_message(message);
554
544
 
555
 
        reply.wreq.bandwidth.set_insufficient(diff);
 
545
        g_wreq->bandwidth.set_insufficient(diff);
556
546
        return INFEASIBLE_BANDWIDTH;
557
547
    }
558
548
    return 0;
567
557
    return (app.weight == -1);
568
558
}
569
559
 
 
560
static inline double get_estimated_delay(BEST_APP_VERSION& bav) {
 
561
    if (bav.host_usage.ncudas) {
 
562
        return g_request->coprocs.cuda.estimated_delay;
 
563
    } else if (bav.host_usage.natis) {
 
564
        return g_request->coprocs.ati.estimated_delay;
 
565
    } else {
 
566
        return g_request->cpu_estimated_delay;
 
567
    }
 
568
}
 
569
 
 
570
static inline void update_estimated_delay(BEST_APP_VERSION& bav, double dt) {
 
571
    if (bav.host_usage.ncudas) {
 
572
        g_request->coprocs.cuda.estimated_delay += dt;
 
573
    } else if (bav.host_usage.natis) {
 
574
        g_request->coprocs.ati.estimated_delay += dt;
 
575
    } else {
 
576
        g_request->cpu_estimated_delay += dt;
 
577
    }
 
578
}
 
579
 
 
580
// return the delay bound to use for this job/host.
 
581
// Actually, return two: optimistic (lower) and pessimistic (higher).
 
582
// If the deadline check with the optimistic bound fails,
 
583
// try the pessimistic bound.
 
584
//
 
585
static void get_delay_bound_range(
 
586
    WORKUNIT& wu,
 
587
    int res_server_state, int res_priority, double res_report_deadline,
 
588
    BEST_APP_VERSION& bav,
 
589
    double& opt, double& pess
 
590
) {
 
591
    if (res_server_state == RESULT_SERVER_STATE_IN_PROGRESS) {
 
592
        double now = dtime();
 
593
        if (res_report_deadline < now) {
 
594
            // if original deadline has passed, return zeros
 
595
            // This will skip deadline check.
 
596
            opt = pess = 0;
 
597
        }
 
598
        opt = res_report_deadline - now;
 
599
        pess = wu.delay_bound;
 
600
    } else {
 
601
        opt = pess = wu.delay_bound;
 
602
 
 
603
        // If the workunit needs reliable and is being sent to a reliable host,
 
604
        // then shorten the delay bound by the percent specified
 
605
        //
 
606
        if (config.reliable_on_priority && res_priority >= config.reliable_on_priority && config.reliable_reduced_delay_bound > 0.01
 
607
        ) {
 
608
            opt = wu.delay_bound*config.reliable_reduced_delay_bound;
 
609
            double est_wallclock_duration = estimate_duration(wu, bav);
 
610
            // Check to see how reasonable this reduced time is.
 
611
            // Increase it to twice the estimated delay bound
 
612
            // if all the following apply:
 
613
            //
 
614
            // 1) Twice the estimate is longer then the reduced delay bound
 
615
            // 2) Twice the estimate is less then the original delay bound
 
616
            // 3) Twice the estimate is less then the twice the reduced delay bound
 
617
            if (est_wallclock_duration*2 > opt
 
618
                && est_wallclock_duration*2 < wu.delay_bound
 
619
                && est_wallclock_duration*2 < wu.delay_bound*config.reliable_reduced_delay_bound*2
 
620
            ) {
 
621
                opt = est_wallclock_duration*2;
 
622
            }
 
623
        }
 
624
    }
 
625
}
 
626
 
 
627
// return 0 if the job, with the given delay bound,
 
628
// will complete by its deadline, and won't cause other jobs to miss deadlines.
 
629
//
570
630
static inline int check_deadline(
571
 
    WORKUNIT& wu, APP& app, SCHEDULER_REQUEST& request, SCHEDULER_REPLY& reply
 
631
    WORKUNIT& wu, APP& app, BEST_APP_VERSION& bav
572
632
) {
573
633
    if (config.ignore_delay_bound) return 0;
574
634
 
576
636
    // and it's not a hard app.
577
637
    // (i.e. everyone gets one result, no matter how slow they are)
578
638
    //
579
 
    if (request.estimated_delay == 0 && !hard_app(app)) return 0;
 
639
    if (get_estimated_delay(bav) == 0 && !hard_app(app)) {
 
640
        if (config.debug_send) {
 
641
            log_messages.printf(MSG_NORMAL,
 
642
                "[send] est delay 0, skipping deadline check\n"
 
643
            );
 
644
        }
 
645
        return 0;
 
646
    }
580
647
 
581
648
    // if it's a hard app, don't send it to a host with no credit
582
649
    //
583
 
    if (hard_app(app) && reply.host.total_credit == 0) {
 
650
    if (hard_app(app) && g_reply->host.total_credit == 0) {
584
651
        return INFEASIBLE_CPU;
585
652
    }
586
653
 
587
 
    double ewd = estimate_wallclock_duration(wu, request, reply);
588
 
    if (hard_app(app)) ewd *= 1.3;
589
 
    double est_completion_delay = request.estimated_delay + ewd;
590
 
    double est_report_delay = max(est_completion_delay, request.global_prefs.work_buf_min());
591
 
    double diff = est_report_delay - wu.delay_bound;
592
 
    if (diff > 0) {
593
 
        if (config.debug_send) {
594
 
            log_messages.printf(MSG_DEBUG,
595
 
                "[WU#%d %s] est report delay %d on [HOST#%d]; delay_bound is %d\n",
596
 
                wu.id, wu.name, (int)est_report_delay,
597
 
                reply.host.id, wu.delay_bound
598
 
            );
599
 
        }
600
 
        reply.wreq.speed.set_insufficient(diff);
601
 
        return INFEASIBLE_CPU;
 
654
    // do EDF simulation if possible; else use cruder approximation
 
655
    //
 
656
    if (config.workload_sim && g_request->have_other_results_list) {
 
657
        double est_dur = estimate_duration(wu, bav);
 
658
        if (g_reply->wreq.edf_reject_test(est_dur, wu.delay_bound)) {
 
659
            return INFEASIBLE_WORKLOAD;
 
660
        }
 
661
        IP_RESULT candidate("", wu.delay_bound, est_dur);
 
662
        strcpy(candidate.name, wu.name);
 
663
        if (check_candidate(candidate, g_wreq->effective_ncpus, g_request->ip_results)) {
 
664
            // it passed the feasibility test,
 
665
            // but don't add it to the workload yet;
 
666
            // wait until we commit to sending it
 
667
        } else {
 
668
            g_reply->wreq.edf_reject(est_dur, wu.delay_bound);
 
669
            g_reply->wreq.speed.set_insufficient(0);
 
670
            return INFEASIBLE_WORKLOAD;
 
671
        }
 
672
    } else {
 
673
        double ewd = estimate_duration(wu, bav);
 
674
        if (hard_app(app)) ewd *= 1.3;
 
675
        double est_completion_delay = get_estimated_delay(bav) + ewd;
 
676
        double est_report_delay = std::max(
 
677
            est_completion_delay,
 
678
            g_request->global_prefs.work_buf_min()
 
679
        );
 
680
        double diff = est_report_delay - wu.delay_bound;
 
681
        if (diff > 0) {
 
682
            if (config.debug_send) {
 
683
                log_messages.printf(MSG_NORMAL,
 
684
                    "[send] [WU#%d] deadline miss %d > %d\n",
 
685
                    wu.id, (int)est_report_delay, wu.delay_bound
 
686
                );
 
687
            }
 
688
            g_reply->wreq.speed.set_insufficient(diff);
 
689
            return INFEASIBLE_CPU;
 
690
        } else {
 
691
            if (config.debug_send) {
 
692
                log_messages.printf(MSG_NORMAL,
 
693
                    "[send] [WU#%d] meets deadline: %.2f + %.2f < %d\n",
 
694
                    wu.id, get_estimated_delay(bav), ewd, wu.delay_bound
 
695
                );
 
696
            }
 
697
        }
602
698
    }
603
699
    return 0;
604
700
}
611
707
//    the host probably won't get the result done within the delay bound
612
708
// 4) app isn't in user's "approved apps" list
613
709
//
 
710
// If the job is feasible, return 0 and fill in wu.delay_bound
 
711
// with the delay bound we've decided to use.
 
712
//
614
713
int wu_is_infeasible_fast(
615
 
    WORKUNIT& wu, SCHEDULER_REQUEST& request, SCHEDULER_REPLY& reply, APP& app
 
714
    WORKUNIT& wu,
 
715
    int res_server_state, int res_priority, double res_report_deadline,
 
716
    APP& app, BEST_APP_VERSION& bav
616
717
) {
617
718
    int retval;
618
719
 
619
 
    // homogeneous redundancy, quick check
 
720
    // project-specific check
 
721
    //
 
722
    if (wu_is_infeasible_custom(wu, app, bav)) {
 
723
        return INFEASIBLE_CUSTOM;
 
724
    }
 
725
 
 
726
    // homogeneous redundancy: can't send if app uses HR and
 
727
    // 1) host is of unknown HR class
620
728
    //
621
729
    if (app_hr_type(app)) {
622
 
        if (hr_unknown_platform_type(reply.host, app_hr_type(app))) {
 
730
        if (hr_unknown_class(g_reply->host, app_hr_type(app))) {
623
731
            if (config.debug_send) {
624
 
                log_messages.printf(MSG_DEBUG,
625
 
                    "[HOST#%d] [WU#%d %s] host is of unknown class in HR type %d\n",
626
 
                    reply.host.id, wu.id, wu.name, app_hr_type(app)
 
732
                log_messages.printf(MSG_NORMAL,
 
733
                    "[send] [HOST#%d] [WU#%d %s] host is of unknown class in HR type %d\n",
 
734
                    g_reply->host.id, wu.id, wu.name, app_hr_type(app)
627
735
                );
628
736
            }
629
737
            return INFEASIBLE_HR;
630
738
        }
631
 
        if (already_sent_to_different_platform_quick(request, wu, app)) {
 
739
        if (already_sent_to_different_platform_quick(wu, app)) {
632
740
            if (config.debug_send) {
633
 
                log_messages.printf(MSG_DEBUG,
634
 
                    "[HOST#%d] [WU#%d %s] failed quick HR check: WU is class %d, host is class %d\n",
635
 
                    reply.host.id, wu.id, wu.name, wu.hr_class, hr_class(request.host, app_hr_type(app))
 
741
                log_messages.printf(MSG_NORMAL,
 
742
                    "[send] [HOST#%d] [WU#%d %s] failed quick HR check: WU is class %d, host is class %d\n",
 
743
                    g_reply->host.id, wu.id, wu.name, wu.hr_class, hr_class(g_request->host, app_hr_type(app))
636
744
                );
637
745
            }
638
746
            return INFEASIBLE_HR;
639
747
        }
640
748
    }
641
 
    
 
749
 
642
750
    if (config.one_result_per_user_per_wu || config.one_result_per_host_per_wu) {
643
 
        if (wu_already_in_reply(wu, reply)) {
 
751
        if (wu_already_in_reply(wu)) {
644
752
            return INFEASIBLE_DUP;
645
753
        }
646
754
    }
647
755
 
648
 
    retval = check_memory(wu, request, reply);
649
 
    if (retval) return retval;
650
 
    retval = check_disk(wu, request, reply);
651
 
    if (retval) return retval;
652
 
    retval = check_bandwidth(wu, request, reply);
653
 
    if (retval) return retval;
654
 
 
655
 
    // do this last because EDF sim uses some CPU
 
756
    retval = check_memory(wu);
 
757
    if (retval) return retval;
 
758
    retval = check_disk(wu);
 
759
    if (retval) return retval;
 
760
    retval = check_bandwidth(wu);
 
761
    if (retval) return retval;
 
762
 
 
763
    if (config.non_cpu_intensive) {
 
764
        return 0;
 
765
    }
 
766
 
 
767
    // do deadline check last because EDF sim uses some CPU
656
768
    //
657
 
    if (config.workload_sim && request.have_other_results_list) {
658
 
        double est_cpu = estimate_cpu_duration(wu, reply);
659
 
        if (reply.wreq.edf_reject_test(est_cpu, wu.delay_bound)) {
660
 
            return INFEASIBLE_WORKLOAD;
661
 
        }
662
 
        IP_RESULT candidate("", wu.delay_bound, est_cpu);
663
 
        strcpy(candidate.name, wu.name);
664
 
        if (check_candidate(candidate, effective_ncpus(request, reply), request.ip_results)) {
665
 
            // it passed the feasibility test,
666
 
            // but don't add it the the workload yet;
667
 
            // wait until we commit to sending it
668
 
        } else {
669
 
            reply.wreq.edf_reject(est_cpu, wu.delay_bound);
670
 
            reply.wreq.speed.set_insufficient(0);
671
 
            return INFEASIBLE_WORKLOAD;
672
 
        }
673
 
    } else {
674
 
        retval = check_deadline(wu, app, request, reply);
675
 
        if (retval) return INFEASIBLE_WORKLOAD;
676
 
    }
677
 
 
678
 
    return 0;
 
769
    double opt, pess;
 
770
    get_delay_bound_range(
 
771
        wu, res_server_state, res_priority, res_report_deadline, bav, opt, pess
 
772
    );
 
773
    wu.delay_bound = (int)opt;
 
774
    if (opt == 0) {
 
775
        // this is a resend; skip deadline check
 
776
        return 0;
 
777
    }
 
778
    retval = check_deadline(wu, app, bav);
 
779
    if (retval && (opt != pess)) {
 
780
        wu.delay_bound = (int)pess;
 
781
        retval = check_deadline(wu, app, bav);
 
782
    }
 
783
    return retval;
679
784
}
680
785
 
681
786
// insert "text" right after "after" in the given buffer
682
787
//
683
 
int insert_after(char* buffer, const char* after, const char* text) {
 
788
static int insert_after(char* buffer, const char* after, const char* text) {
684
789
    char* p;
685
790
    char temp[BLOB_SIZE];
686
791
 
687
 
    if (strlen(buffer) + strlen(text) > BLOB_SIZE-1) {
 
792
    if (strlen(buffer) + strlen(text) >= BLOB_SIZE-1) {
688
793
        log_messages.printf(MSG_CRITICAL,
689
 
            "insert_after: overflow: %d %d\n", strlen(buffer), strlen(text)
 
794
            "insert_after: overflow: %d %d\n",
 
795
            (int)strlen(buffer),
 
796
            (int)strlen(text)
690
797
        );
691
798
        return ERR_BUFFER_OVERFLOW;
692
799
    }
707
814
// add elements to WU's xml_doc,
708
815
// in preparation for sending it to a client
709
816
//
710
 
int insert_wu_tags(WORKUNIT& wu, APP& app) {
 
817
static int insert_wu_tags(WORKUNIT& wu, APP& app) {
711
818
    char buf[BLOB_SIZE];
712
 
    
 
819
 
713
820
    sprintf(buf,
714
821
        "    <rsc_fpops_est>%f</rsc_fpops_est>\n"
715
822
        "    <rsc_fpops_bound>%f</rsc_fpops_bound>\n"
727
834
    return insert_after(wu.xml_doc, "<workunit>\n", buf);
728
835
}
729
836
 
730
 
// add the given workunit to a reply.
731
 
// Add the app and app_version to the reply also.
 
837
// Add the given workunit, app, and app version to a reply.
732
838
//
733
 
int add_wu_to_reply(
 
839
static int add_wu_to_reply(
734
840
    WORKUNIT& wu, SCHEDULER_REPLY& reply, APP* app, BEST_APP_VERSION* bavp
735
841
) {
736
842
    int retval;
737
843
    WORKUNIT wu2, wu3;
738
 
    
 
844
 
739
845
    APP_VERSION* avp = bavp->avp;
740
 
    if (avp == (APP_VERSION*)1) avp = NULL;
741
846
 
742
847
    // add the app, app_version, and workunit to the reply,
743
848
    // but only if they aren't already there
744
849
    //
745
850
    if (avp) {
746
851
        APP_VERSION av2=*avp, *avp2=&av2;
747
 
        
748
 
        if (config.choose_download_url_by_timezone) {
749
 
            process_av_timezone(reply, avp, av2);
 
852
 
 
853
        if (strlen(config.replace_download_url_by_timezone)) {
 
854
            process_av_timezone(avp, av2);
750
855
        }
751
 
        
752
 
        reply.insert_app_unique(*app);
 
856
 
 
857
        g_reply->insert_app_unique(*app);
753
858
        av2.bavp = bavp;
754
 
        reply.insert_app_version_unique(*avp2);
 
859
        g_reply->insert_app_version_unique(*avp2);
755
860
        if (config.debug_send) {
756
 
            log_messages.printf(MSG_DEBUG,
757
 
                "[HOST#%d] Sending app_version %s %d %d\n",
758
 
                reply.host.id, app->name, avp2->platformid, avp2->version_num
 
861
            log_messages.printf(MSG_NORMAL,
 
862
                "[send] Sending app_version %s %d %d %s; projected %.2f GFLOPS\n",
 
863
                app->name,
 
864
                avp2->platformid, avp2->version_num, avp2->plan_class,
 
865
                bavp->host_usage.projected_flops/1e9
759
866
            );
760
867
        }
761
868
    }
762
869
 
763
 
    // add time estimate to reply
 
870
    // modify the WU's xml_doc; add <name>, <rsc_*> etc.
764
871
    //
765
872
    wu2 = wu;       // make copy since we're going to modify its XML field
 
873
 
 
874
    // adjust FPOPS figures for anonymous platform
 
875
    //
 
876
    if (bavp->cavp) {
 
877
        wu2.rsc_fpops_est *= bavp->cavp->rsc_fpops_scale;
 
878
        wu2.rsc_fpops_bound *= bavp->cavp->rsc_fpops_scale;
 
879
    }
766
880
    retval = insert_wu_tags(wu2, *app);
767
881
    if (retval) {
768
882
        log_messages.printf(MSG_CRITICAL, "insert_wu_tags failed %d\n", retval);
769
883
        return retval;
770
884
    }
771
 
    wu3=wu2;
772
 
    if (config.choose_download_url_by_timezone) {
773
 
        process_wu_timezone(reply, wu2, wu3);
 
885
    wu3 = wu2;
 
886
    if (strlen(config.replace_download_url_by_timezone)) {
 
887
        process_wu_timezone(wu2, wu3);
774
888
    }
775
 
    
776
 
    reply.insert_workunit_unique(wu3);
 
889
 
 
890
    g_reply->insert_workunit_unique(wu3);
777
891
 
778
892
    // switch to tighter policy for estimating delay
779
893
    //
780
894
    return 0;
781
895
}
782
896
 
783
 
int insert_name_tags(RESULT& result, WORKUNIT const& wu) {
 
897
// add <name> tags to result's xml_doc_in
 
898
//
 
899
static int insert_name_tags(RESULT& result, WORKUNIT const& wu) {
784
900
    char buf[256];
785
901
    int retval;
786
902
 
793
909
    return 0;
794
910
}
795
911
 
796
 
int insert_deadline_tag(RESULT& result) {
 
912
static int insert_deadline_tag(RESULT& result) {
797
913
    char buf[256];
798
914
    sprintf(buf, "<report_deadline>%d</report_deadline>\n", result.report_deadline);
799
915
    int retval = insert_after(result.xml_doc_in, "<result>\n", buf);
806
922
    char buf[256];
807
923
 
808
924
    dbwu.id = wu.id;
809
 
    
 
925
 
810
926
    // SQL note: can't use min() here
811
927
    //
812
928
    sprintf(buf,
818
934
 
819
935
// return true iff a result for same WU is already being sent
820
936
//
821
 
bool wu_already_in_reply(WORKUNIT& wu, SCHEDULER_REPLY& reply) {
 
937
bool wu_already_in_reply(WORKUNIT& wu) {
822
938
    unsigned int i;
823
 
    for (i=0; i<reply.results.size(); i++) {
824
 
        if (wu.id == reply.results[i].workunitid) {
 
939
    for (i=0; i<g_reply->results.size(); i++) {
 
940
        if (wu.id == g_reply->results[i].workunitid) {
825
941
            return true;
826
942
        }
827
943
    }
841
957
// and we haven't exceeded result per RPC limit,
842
958
// and we haven't exceeded results per day limit
843
959
//
844
 
bool work_needed(
845
 
    SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, bool locality_sched
846
 
) {
 
960
bool work_needed(bool locality_sched) {
847
961
    if (locality_sched) {
848
962
        // if we've failed to send a result because of a transient condition,
849
963
        // return false to preserve invariant
850
964
        //
851
 
        if (reply.wreq.disk.insufficient || reply.wreq.speed.insufficient || reply.wreq.mem.insufficient || reply.wreq.no_allowed_apps_available) {
852
 
            return false;
853
 
        }
854
 
    }
855
 
    if (reply.wreq.seconds_to_fill <= 0) return false;
856
 
    if (reply.wreq.disk_available <= 0) {
857
 
        return false;
858
 
    }
859
 
    if (reply.wreq.nresults >= config.max_wus_to_send) return false;
860
 
 
861
 
    int ncpus = effective_ncpus(sreq, reply);
862
 
 
863
 
    // host.max_results_day is between 1 and config.daily_result_quota inclusive
864
 
    // wreq.daily_result_quota is between ncpus
865
 
    // and ncpus*host.max_results_day inclusive
866
 
    //
867
 
    if (config.daily_result_quota) {
868
 
        if (reply.host.max_results_day == 0 || reply.host.max_results_day>config.daily_result_quota) {
869
 
            reply.host.max_results_day = config.daily_result_quota;
870
 
        }
871
 
        reply.wreq.daily_result_quota = ncpus*reply.host.max_results_day;
872
 
        if (reply.host.nresults_today >= reply.wreq.daily_result_quota) {
873
 
            reply.wreq.daily_result_quota_exceeded = true;
874
 
            return false;
875
 
        }
876
 
    }
877
 
 
878
 
    if (config.max_wus_in_progress) {
879
 
        if (reply.wreq.nresults_on_host >= config.max_wus_in_progress*ncpus) {
 
965
        if (g_wreq->disk.insufficient || g_wreq->speed.insufficient || g_wreq->mem.insufficient || g_wreq->no_allowed_apps_available) {
880
966
            if (config.debug_send) {
881
 
                log_messages.printf(MSG_DEBUG,
882
 
                    "in-progress job limit exceeded; %d > %d*%d\n",
883
 
                    reply.wreq.nresults_on_host, config.max_wus_in_progress, ncpus
 
967
                log_messages.printf(MSG_NORMAL,
 
968
                    "[send] stopping work search - locality condition\n"
884
969
                );
885
970
            }
886
 
            reply.wreq.cache_size_exceeded = true;
887
971
            return false;
888
972
        }
889
973
    }
890
 
    return true;
891
 
}
892
 
 
893
 
void SCHEDULER_REPLY::got_good_result() {
894
 
    host.max_results_day *= 2;
895
 
    if (host.max_results_day > config.daily_result_quota) {
896
 
        host.max_results_day = config.daily_result_quota;
897
 
    }
898
 
}
899
 
 
900
 
void SCHEDULER_REPLY::got_bad_result() {
901
 
    host.max_results_day -= 1;
902
 
    if (host.max_results_day < 1) {
903
 
        host.max_results_day = 1;
 
974
 
 
975
    // see if we've reached limits on in-progress jobs
 
976
    //
 
977
    bool some_type_allowed = false;
 
978
    if (config.max_jobs_in_progress.exceeded(NULL, true)) {
 
979
        if (config.debug_quota) {
 
980
            log_messages.printf(MSG_NORMAL,
 
981
                "[quota] reached limit on GPU jobs in progress\n"
 
982
            );
 
983
        }
 
984
        g_wreq->clear_gpu_req();
 
985
        if (g_wreq->effective_ngpus) {
 
986
            g_wreq->max_jobs_on_host_gpu_exceeded = true;
 
987
        }
 
988
    } else {
 
989
        some_type_allowed = true;
 
990
    }
 
991
    if (config.max_jobs_in_progress.exceeded(NULL, false)) {
 
992
        if (config.debug_quota) {
 
993
            log_messages.printf(MSG_NORMAL,
 
994
                "[quota] reached limit on CPU jobs in progress\n"
 
995
            );
 
996
        }
 
997
        g_wreq->clear_cpu_req();
 
998
        g_wreq->max_jobs_on_host_cpu_exceeded = true;
 
999
    } else {
 
1000
        some_type_allowed = true;
 
1001
    }
 
1002
    if (!some_type_allowed) {
 
1003
        if (config.debug_send) {
 
1004
            log_messages.printf(MSG_NORMAL,
 
1005
                "[send] in-progress job limit exceeded\n"
 
1006
            );
 
1007
        }
 
1008
        g_wreq->max_jobs_on_host_exceeded = true;
 
1009
        return false;
 
1010
    }
 
1011
 
 
1012
    // see if we've reached max jobs per RPC
 
1013
    //
 
1014
    if (g_wreq->njobs_sent >= g_wreq->max_jobs_per_rpc) {
 
1015
        if (config.debug_quota) {
 
1016
            log_messages.printf(MSG_NORMAL,
 
1017
                "[quota] stopping work search - njobs %d >= max_jobs_per_rpc %d\n",
 
1018
                g_wreq->njobs_sent, g_wreq->max_jobs_per_rpc
 
1019
            );
 
1020
        }
 
1021
        return false;
 
1022
    }
 
1023
 
 
1024
#if 0
 
1025
    log_messages.printf(MSG_NORMAL,
 
1026
        "work_needed: spec req %d sec to fill %.2f; CPU (%.2f, %.2f) CUDA (%.2f, %.2f) ATI(%.2f, %.2f)\n",
 
1027
        g_wreq->rsc_spec_request,
 
1028
        g_wreq->seconds_to_fill,
 
1029
        g_wreq->cpu_req_secs, g_wreq->cpu_req_instances,
 
1030
        g_wreq->cuda_req_secs, g_wreq->cuda_req_instances,
 
1031
        g_wreq->ati_req_secs, g_wreq->ati_req_instances
 
1032
    );
 
1033
#endif
 
1034
    if (g_wreq->rsc_spec_request) {
 
1035
        if (g_wreq->need_cpu()) {
 
1036
            return true;
 
1037
        }
 
1038
        if (g_wreq->need_cuda()) {
 
1039
            return true;
 
1040
        }
 
1041
        if (g_wreq->need_ati()) {
 
1042
            return true;
 
1043
        }
 
1044
    } else {
 
1045
        if (g_wreq->seconds_to_fill > 0) {
 
1046
            return true;
 
1047
        }
 
1048
    }
 
1049
    if (config.debug_send) {
 
1050
        log_messages.printf(MSG_NORMAL, "[send] don't need more work\n");
 
1051
    }
 
1052
    return false;
 
1053
}
 
1054
 
 
1055
// return the app version ID, or -2/-3/-4 if anonymous platform
 
1056
//
 
1057
inline static int get_app_version_id(BEST_APP_VERSION* bavp) {
 
1058
    if (bavp->avp) {
 
1059
        return bavp->avp->id;
 
1060
    } else {
 
1061
        return bavp->cavp->host_usage.resource_type();
904
1062
    }
905
1063
}
906
1064
 
907
1065
int add_result_to_reply(
908
 
    DB_RESULT& result, WORKUNIT& wu, SCHEDULER_REQUEST& request,
909
 
    SCHEDULER_REPLY& reply, BEST_APP_VERSION* bavp
 
1066
    DB_RESULT& result, WORKUNIT& wu, BEST_APP_VERSION* bavp,
 
1067
    bool locality_scheduling
910
1068
) {
911
1069
    int retval;
912
 
    double wu_seconds_filled;
913
1070
    bool resent_result = false;
914
1071
    APP* app = ssp->lookup_app(wu.appid);
915
1072
 
916
 
    retval = add_wu_to_reply(wu, reply, app, bavp);
 
1073
    retval = add_wu_to_reply(wu, *g_reply, app, bavp);
917
1074
    if (retval) return retval;
918
1075
 
919
 
    // in the scheduling locality case,
920
 
    // reduce the available space by LESS than the workunit rsc_disk_bound,
921
 
    // IF the host already has the file OR the file was not already sent.
 
1076
    // Adjust available disk space.
 
1077
    // In the scheduling locality case,
 
1078
    // reduce the available space by less than the workunit rsc_disk_bound,
 
1079
    // if the host already has the file or the file was not already sent.
922
1080
    //
923
 
    if (!config.locality_scheduling ||
924
 
        decrement_disk_space_locality(wu, request, reply)
925
 
    ) {
926
 
        reply.wreq.disk_available -= wu.rsc_disk_bound;
 
1081
    if (!locality_scheduling || decrement_disk_space_locality(wu)) {
 
1082
        g_wreq->disk_available -= wu.rsc_disk_bound;
927
1083
    }
928
1084
 
929
1085
    // update the result in DB
930
1086
    //
931
 
    result.hostid = reply.host.id;
932
 
    result.userid = reply.user.id;
 
1087
    result.hostid = g_reply->host.id;
 
1088
    result.userid = g_reply->user.id;
933
1089
    result.sent_time = time(0);
 
1090
    result.report_deadline = result.sent_time + wu.delay_bound;
 
1091
    result.flops_estimate = bavp->host_usage.peak_flops;
 
1092
    result.app_version_id = get_app_version_id(bavp);
934
1093
    int old_server_state = result.server_state;
935
1094
 
936
 
    int delay_bound = wu.delay_bound;
937
1095
    if (result.server_state != RESULT_SERVER_STATE_IN_PROGRESS) {
938
 
        // We are sending this result for the first time
939
 
        //
940
 
        // If the workunit needs reliable and is being sent to a reliable host,
941
 
        // then shorten the delay bound by the percent specified
942
 
        //
943
 
        if (config.reliable_on_priority && result.priority >= config.reliable_on_priority && config.reliable_reduced_delay_bound > 0.01
944
 
        ) {
945
 
                        double reduced_delay_bound = delay_bound*config.reliable_reduced_delay_bound;
946
 
                        double est_wallclock_duration = estimate_wallclock_duration(wu, request, reply);
947
 
            // Check to see how reasonable this reduced time is.
948
 
            // Increase it to twice the estimated delay bound
949
 
            // if all the following apply:
950
 
            //
951
 
                        // 1) Twice the estimate is longer then the reduced delay bound
952
 
                        // 2) Twice the estimate is less then the original delay bound
953
 
                        // 3) Twice the estimate is less then the twice the reduced delay bound
954
 
                        if (est_wallclock_duration*2 > reduced_delay_bound && est_wallclock_duration*2 < delay_bound && est_wallclock_duration*2 < delay_bound*config.reliable_reduced_delay_bound*2 ) {
955
 
                        reduced_delay_bound = est_wallclock_duration*2;
956
 
            }
957
 
                        delay_bound = (int) reduced_delay_bound;
958
 
        }
959
 
 
960
 
        result.report_deadline = result.sent_time + delay_bound;
 
1096
        // We're sending this result for the first time
 
1097
        //
961
1098
        result.server_state = RESULT_SERVER_STATE_IN_PROGRESS;
962
1099
    } else {
963
1100
        // Result was already sent to this host but was lost,
964
 
        // so we are resending it.
 
1101
        // so we're resending it.
965
1102
        //
966
1103
        resent_result = true;
967
 
 
968
 
        // TODO: explain the following
969
 
        //
970
 
        if (result.report_deadline < result.sent_time) {
971
 
            result.report_deadline = result.sent_time + 10;
972
 
        }
973
 
        if (result.report_deadline > result.sent_time + delay_bound) {
974
 
            result.report_deadline = result.sent_time + delay_bound;
975
 
        }
976
1104
 
977
1105
        if (config.debug_send) {
978
 
            log_messages.printf(MSG_DEBUG,
979
 
                "[RESULT#%d] [HOST#%d] (resend lost work)\n",
980
 
                result.id, reply.host.id
 
1106
            log_messages.printf(MSG_NORMAL,
 
1107
                "[send] [RESULT#%d] [HOST#%d] (resend lost work)\n",
 
1108
                result.id, g_reply->host.id
981
1109
            );
982
1110
        }
983
1111
    }
985
1113
    if (retval == ERR_DB_NOT_FOUND) {
986
1114
        log_messages.printf(MSG_CRITICAL,
987
1115
            "[RESULT#%d] [HOST#%d]: CAN'T SEND, already sent to another host\n",
988
 
            result.id, reply.host.id
 
1116
            result.id, g_reply->host.id
989
1117
        );
990
1118
    } else if (retval) {
991
1119
        log_messages.printf(MSG_CRITICAL,
994
1122
    }
995
1123
    if (retval) return retval;
996
1124
 
997
 
    wu_seconds_filled = estimate_wallclock_duration(wu, request, reply);
 
1125
    double est_dur = estimate_duration(wu, *bavp);
998
1126
    if (config.debug_send) {
999
1127
        log_messages.printf(MSG_NORMAL,
1000
 
            "[HOST#%d] Sending [RESULT#%d %s] (fills %.2f seconds)\n",
1001
 
            reply.host.id, result.id, result.name, wu_seconds_filled
 
1128
            "[HOST#%d] Sending [RESULT#%d %s] (est. dur. %.2f seconds)\n",
 
1129
            g_reply->host.id, result.id, result.name, est_dur
1002
1130
        );
1003
1131
    }
1004
1132
 
1030
1158
        return retval;
1031
1159
    }
1032
1160
    result.bavp = bavp;
1033
 
    reply.insert_result(result);
1034
 
    reply.wreq.seconds_to_fill -= wu_seconds_filled;
1035
 
    request.estimated_delay += wu_seconds_filled/effective_ncpus(request, reply);
1036
 
    reply.wreq.nresults++;
1037
 
    reply.wreq.nresults_on_host++;
1038
 
    if (!resent_result) reply.host.nresults_today++;
 
1161
    g_reply->insert_result(result);
 
1162
    if (g_wreq->rsc_spec_request) {
 
1163
        if (bavp->host_usage.ncudas) {
 
1164
            g_wreq->cuda_req_secs -= est_dur;
 
1165
            g_wreq->cuda_req_instances -= bavp->host_usage.ncudas;
 
1166
        } else if (bavp->host_usage.natis) {
 
1167
            g_wreq->ati_req_secs -= est_dur;
 
1168
            g_wreq->ati_req_instances -= bavp->host_usage.natis;
 
1169
        } else {
 
1170
            g_wreq->cpu_req_secs -= est_dur;
 
1171
            g_wreq->cpu_req_instances -= bavp->host_usage.avg_ncpus;
 
1172
        }
 
1173
    } else {
 
1174
        g_wreq->seconds_to_fill -= est_dur;
 
1175
    }
 
1176
    update_estimated_delay(*bavp, est_dur);
 
1177
    g_wreq->njobs_sent++;
 
1178
    config.max_jobs_in_progress.register_job(app, bavp->host_usage.uses_gpu());
 
1179
    if (!resent_result) {
 
1180
        DB_HOST_APP_VERSION* havp = bavp->host_app_version();
 
1181
        if (havp) {
 
1182
            havp->n_jobs_today++;
 
1183
        }
 
1184
    }
1039
1185
 
1040
1186
    // add this result to workload for simulation
1041
1187
    //
1042
 
    if (config.workload_sim && request.have_other_results_list) {
1043
 
        double est_cpu = estimate_cpu_duration(wu, reply);
1044
 
        IP_RESULT ipr ("", time(0)+wu.delay_bound, est_cpu);
1045
 
        request.ip_results.push_back(ipr);
 
1188
    if (config.workload_sim && g_request->have_other_results_list) {
 
1189
        IP_RESULT ipr ("", time(0)+wu.delay_bound, est_dur);
 
1190
        g_request->ip_results.push_back(ipr);
1046
1191
    }
1047
1192
 
1048
1193
    // mark job as done if debugging flag is set;
1067
1212
    // mark it as replicated
1068
1213
    //
1069
1214
    if (wu.target_nresults == 1 && app->target_nresults > 1) {
1070
 
        if (reply.wreq.trust) {
 
1215
        if (bavp->trusted) {
1071
1216
            if (config.debug_send) {
1072
 
                log_messages.printf(MSG_DEBUG,
1073
 
                    "[WU#%d] sending to trusted host, not replicating\n", wu.id
 
1217
                log_messages.printf(MSG_NORMAL,
 
1218
                    "[send] [WU#%d] using trusted app version, not replicating\n", wu.id
1074
1219
                );
1075
1220
            }
1076
1221
        } else {
1082
1227
            );
1083
1228
            dbwu.id = wu.id;
1084
1229
            if (config.debug_send) {
1085
 
                log_messages.printf(MSG_DEBUG,
1086
 
                    "[WU#%d] sending to untrusted host, replicating\n", wu.id
 
1230
                log_messages.printf(MSG_NORMAL,
 
1231
                    "[send] [WU#%d] sending to untrusted host, replicating\n", wu.id
1087
1232
                );
1088
1233
            }
1089
1234
            retval = dbwu.update_field(buf);
1098
1243
    return 0;
1099
1244
}
1100
1245
 
1101
 
// send messages to user about why jobs were or weren't sent
1102
 
//
1103
 
static void explain_to_user(SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply) {
1104
 
    char helpful[512];
 
1246
// Send high-priority messages about things the user can change easily
 
1247
// (namely the driver version)
 
1248
// and low-priority messages about things that can't easily be changed,
 
1249
// but which may be interfering with getting tasks or latest apps
 
1250
//
 
1251
static void send_gpu_messages(
 
1252
    GPU_REQUIREMENTS& req, double ram, int version, const char* rsc_name
 
1253
) {
 
1254
    char buf[256];
 
1255
    if (ram < req.min_ram) {
 
1256
        sprintf(buf,
 
1257
            "A minimum of %d MB (preferably %d MB) of video RAM is needed to process tasks using your computer's %s",
 
1258
            (int) (req.min_ram/MEGA),
 
1259
            (int) (req.opt_ram/MEGA),
 
1260
            rsc_name
 
1261
        );
 
1262
        g_reply->insert_message(buf, "low");
 
1263
    } else {
 
1264
        if (version) {
 
1265
            if (version < req.min_driver_version) {
 
1266
                sprintf(buf,
 
1267
                    "%s: %s",
 
1268
                    rsc_name,
 
1269
                    _("Upgrade to the latest driver to process tasks using your computer's GPU")
 
1270
                );
 
1271
                g_reply->insert_message(buf, "notice");
 
1272
            } else if (version < req.opt_driver_version) {
 
1273
                sprintf(buf,
 
1274
                    "%s: %s",
 
1275
                    rsc_name,
 
1276
                    _("Upgrade to the latest driver to use all of this project's GPU applications")
 
1277
                );
 
1278
                g_reply->insert_message(buf, "low");
 
1279
            }
 
1280
        }
 
1281
    }
 
1282
}
 
1283
 
 
1284
// send messages to user about why jobs were or weren't sent,
 
1285
// recommendations for GPU driver upgrades, etc.
 
1286
//
 
1287
static void send_user_messages() {
 
1288
    char buf[512];
1105
1289
    unsigned int i;
1106
1290
    int j;
1107
1291
 
 
1292
    // Mac client with GPU but too-old client
 
1293
    //
 
1294
    if (g_request->coprocs.cuda.count
 
1295
        && ssp->have_cuda_apps
 
1296
        && strstr(g_request->host.os_name, "Darwin")
 
1297
        && g_request->core_client_version < 61028
 
1298
    ) {
 
1299
        g_reply->insert_message(
 
1300
            _("A newer version of BOINC is needed to use your NVIDIA GPU; please upgrade to the current version"),
 
1301
            "notice"
 
1302
        );
 
1303
    }
 
1304
 
 
1305
    // GPU-only project, client lacks GPU
 
1306
    //
 
1307
    bool usable_gpu = (ssp->have_cuda_apps && g_request->coprocs.cuda.count)
 
1308
        || (ssp->have_ati_apps && g_request->coprocs.ati.count);
 
1309
    if (!ssp->have_cpu_apps && !usable_gpu) {
 
1310
        if (ssp->have_cuda_apps) {
 
1311
            if (ssp->have_ati_apps) {
 
1312
                g_reply->insert_message(
 
1313
                    _("An NVIDIA or ATI GPU is required to run tasks for this project"),
 
1314
                    "notice"
 
1315
                );
 
1316
            } else {
 
1317
                g_reply->insert_message(
 
1318
                    _("An NVIDIA GPU is required to run tasks for this project"),
 
1319
                    "notice"
 
1320
                );
 
1321
            }
 
1322
        } else if (ssp->have_ati_apps) {
 
1323
            g_reply->insert_message(
 
1324
                _("An ATI GPU is required to run tasks for this project"),
 
1325
                "notice"
 
1326
            );
 
1327
        }
 
1328
    }
 
1329
 
 
1330
    if (g_request->coprocs.cuda.count && ssp->have_cuda_apps) {
 
1331
        send_gpu_messages(cuda_requirements,
 
1332
            g_request->coprocs.cuda.prop.dtotalGlobalMem,
 
1333
            g_request->coprocs.cuda.display_driver_version,
 
1334
            "NVIDIA GPU"
 
1335
        );
 
1336
    }
 
1337
    if (g_request->coprocs.ati.count && ssp->have_ati_apps) {
 
1338
        send_gpu_messages(ati_requirements,
 
1339
            g_request->coprocs.ati.attribs.localRAM*MEGA,
 
1340
            g_request->coprocs.ati.version_num,
 
1341
            "ATI GPU"
 
1342
        );
 
1343
    }
 
1344
 
 
1345
 
1108
1346
    // If work was sent from apps the user did not select, explain.
1109
1347
    // NOTE: this will have to be done differently with matchmaker scheduling
1110
1348
    //
1111
 
    if (!config.locality_scheduling && !config.matchmaker) {
1112
 
        if (reply.wreq.nresults && !reply.wreq.user_apps_only) {
1113
 
            USER_MESSAGE um(
 
1349
    if (!config.locality_scheduling && !config.locality_scheduler_fraction && !config.matchmaker) {
 
1350
        if (g_wreq->njobs_sent && !g_wreq->user_apps_only) {
 
1351
            g_reply->insert_message(
1114
1352
                "No work can be sent for the applications you have selected",
1115
 
                "high"
 
1353
                "low"
1116
1354
            );
1117
 
            reply.insert_message(um);
1118
1355
 
1119
1356
            // Inform the user about applications with no work
1120
1357
            //
1121
 
            for (i=0; i<reply.wreq.host_info.preferred_apps.size(); i++) {
1122
 
                if (!reply.wreq.host_info.preferred_apps[i].work_available) {
1123
 
                    APP* app = ssp->lookup_app(reply.wreq.host_info.preferred_apps[i].appid);
 
1358
            for (i=0; i<g_wreq->preferred_apps.size(); i++) {
 
1359
                if (!g_wreq->preferred_apps[i].work_available) {
 
1360
                    APP* app = ssp->lookup_app(g_wreq->preferred_apps[i].appid);
1124
1361
                    // don't write message if the app is deprecated
1125
1362
                    //
1126
1363
                    if (app) {
1127
1364
                        char explanation[256];
1128
1365
                        sprintf(explanation,
1129
1366
                            "No work is available for %s",
1130
 
                            find_user_friendly_name(reply.wreq.host_info.preferred_apps[i].appid)
 
1367
                            find_user_friendly_name(g_wreq->preferred_apps[i].appid)
1131
1368
                        );
1132
 
                        USER_MESSAGE um2(explanation, "high");
1133
 
                        reply.insert_message(um2);
 
1369
                        g_reply->insert_message( explanation, "low");
1134
1370
                    }
1135
1371
                }
1136
1372
            }
1138
1374
            // Tell the user about applications they didn't qualify for
1139
1375
            //
1140
1376
            for (j=0; j<preferred_app_message_index; j++){
1141
 
                reply.insert_message(reply.wreq.no_work_messages.at(j));
 
1377
                g_reply->insert_message(g_wreq->no_work_messages.at(j));
1142
1378
            }
1143
 
            USER_MESSAGE um1(
1144
 
                "You have selected to receive work from other applications if no work is available for the applications you selected",
1145
 
                "high"
1146
 
            );
1147
 
            reply.insert_message(um1);
1148
 
            USER_MESSAGE um2("Sending work from other applications", "high");
1149
 
            reply.insert_message(um2);
 
1379
            g_reply->insert_message(
 
1380
                "Your preferences allow work from applications other than those selected",
 
1381
                "low"
 
1382
            );
 
1383
            g_reply->insert_message(
 
1384
                "Sending work from other applications", "low"
 
1385
            );
1150
1386
        }
1151
1387
    }
1152
1388
 
1153
1389
    // if client asked for work and we're not sending any, explain why
1154
1390
    //
1155
 
    if (reply.wreq.nresults == 0) {
1156
 
        reply.set_delay(DELAY_NO_WORK_TEMP);
1157
 
        USER_MESSAGE um2("No work sent", "high");
1158
 
        reply.insert_message(um2);
1159
 
        // Inform the user about applications with no work
1160
 
        for (i=0; i<reply.wreq.host_info.preferred_apps.size(); i++) {
1161
 
                if (!reply.wreq.host_info.preferred_apps[i].work_available) {
1162
 
                        APP* app = ssp->lookup_app(reply.wreq.host_info.preferred_apps[i].appid);
1163
 
                        // don't write message if the app is deprecated
1164
 
                        if (app != NULL) {
1165
 
                                char explanation[256];
1166
 
                                sprintf(explanation, "No work is available for %s",
1167
 
                        find_user_friendly_name(reply.wreq.host_info.preferred_apps[i].appid)
 
1391
    if (g_wreq->njobs_sent == 0) {
 
1392
        g_reply->set_delay(DELAY_NO_WORK_TEMP);
 
1393
        g_reply->insert_message("No work sent", "low");
 
1394
 
 
1395
        // Tell the user about applications with no work
 
1396
        //
 
1397
        for (i=0; i<g_wreq->preferred_apps.size(); i++) {
 
1398
            if (!g_wreq->preferred_apps[i].work_available) {
 
1399
                APP* app = ssp->lookup_app(g_wreq->preferred_apps[i].appid);
 
1400
                // don't write message if the app is deprecated
 
1401
                if (app != NULL) {
 
1402
                    sprintf(buf, "No work is available for %s",
 
1403
                        find_user_friendly_name(
 
1404
                            g_wreq->preferred_apps[i].appid
 
1405
                        )
1168
1406
                    );
1169
 
                                USER_MESSAGE um(explanation, "high");
1170
 
                                reply.insert_message(um);
1171
 
                        }
1172
 
                }
1173
 
        }
1174
 
        // Inform the user about applications they didn't qualify for
1175
 
        for (i=0; i<reply.wreq.no_work_messages.size(); i++){
1176
 
                reply.insert_message(reply.wreq.no_work_messages.at(i));
1177
 
        }
1178
 
        if (reply.wreq.no_app_version) {
1179
 
            reply.set_delay(DELAY_NO_WORK_PERM);
1180
 
        }
1181
 
        if (reply.wreq.no_allowed_apps_available) {
1182
 
            USER_MESSAGE um(
1183
 
                "No work available for the applications you have selected.  Please check your settings on the web site.",
1184
 
                "high"
 
1407
                    g_reply->insert_message(buf, "low");
 
1408
                }
 
1409
            }
 
1410
        }
 
1411
 
 
1412
        // Tell the user about applications they didn't qualify for
 
1413
        //
 
1414
        for (i=0; i<g_wreq->no_work_messages.size(); i++){
 
1415
            g_reply->insert_message(g_wreq->no_work_messages.at(i));
 
1416
        }
 
1417
        if (g_wreq->no_allowed_apps_available) {
 
1418
            g_reply->insert_message(
 
1419
                _("No work available for the applications you have selected.  Please check your project preferences on the web site."),
 
1420
                "notice"
1185
1421
            );
1186
 
            reply.insert_message(um);
1187
1422
        }
1188
 
        if (reply.wreq.speed.insufficient) {
1189
 
            if (sreq.core_client_version>419) {
1190
 
                sprintf(helpful,
1191
 
                    "(won't finish in time) "
1192
 
                    "BOINC runs %.1f%% of time, computation enabled %.1f%% of that",
1193
 
                    100.0*reply.host.on_frac, 100.0*reply.host.active_frac
 
1423
        if (g_wreq->speed.insufficient) {
 
1424
            if (g_request->core_client_version>41900) {
 
1425
                sprintf(buf,
 
1426
                    "Tasks won't finish in time: BOINC runs %.1f%% of the time; computation is enabled %.1f%% of that",
 
1427
                    100*g_reply->host.on_frac, 100*g_reply->host.active_frac
1194
1428
                );
1195
1429
            } else {
1196
 
                sprintf(helpful,
1197
 
                    "(won't finish in time) "
1198
 
                    "Computer available %.1f%% of time",
1199
 
                    100.0*reply.host.on_frac
1200
 
                );
1201
 
            }
1202
 
            USER_MESSAGE um(helpful, "high");
1203
 
            reply.insert_message(um);
1204
 
        }
1205
 
        if (reply.wreq.hr_reject_temp) {
1206
 
            USER_MESSAGE um(
1207
 
                "(there was work but it was committed to other platforms)",
1208
 
                "high"
1209
 
            );
1210
 
            reply.insert_message(um);
1211
 
        }
1212
 
        if (reply.wreq.hr_reject_perm) {
1213
 
            USER_MESSAGE um(
1214
 
                "(your platform is not supported by this project)",
1215
 
                "high"
1216
 
            );
1217
 
            reply.insert_message(um);
1218
 
        }
1219
 
        if (reply.wreq.outdated_core) {
1220
 
            USER_MESSAGE um(
1221
 
                " (your BOINC client is old - please install current version)",
1222
 
                "high"
1223
 
            );
1224
 
            reply.insert_message(um);
1225
 
            reply.set_delay(DELAY_NO_WORK_PERM);
1226
 
            log_messages.printf(MSG_NORMAL,
1227
 
                "Not sending work because client is outdated\n"
1228
 
            );
1229
 
        }
1230
 
        if (reply.wreq.excessive_work_buf) {
1231
 
            USER_MESSAGE um(
1232
 
                "(Your network connection interval is longer than WU deadline)",
1233
 
                "high"
1234
 
            );
1235
 
            reply.insert_message(um);
1236
 
        }
1237
 
        if (reply.wreq.no_jobs_available) {
1238
 
            USER_MESSAGE um(
1239
 
                "(Project has no jobs available)",
1240
 
                "high"
1241
 
            );
1242
 
            reply.insert_message(um);
1243
 
        }
1244
 
        if (reply.wreq.daily_result_quota_exceeded) {
1245
 
            struct tm *rpc_time_tm;
1246
 
            int delay_time;
1247
 
 
1248
 
            sprintf(helpful, "(reached daily quota of %d results)", reply.wreq.daily_result_quota);
1249
 
            USER_MESSAGE um(helpful, "high");
1250
 
            reply.insert_message(um);
1251
 
            log_messages.printf(MSG_NORMAL,
1252
 
                "Daily result quota exceeded for host %d\n",
1253
 
                reply.host.id
1254
 
            );
1255
 
 
1256
 
            // set delay so host won't return until a random time in
1257
 
            // the first hour of the next day.
1258
 
            // This is to prevent a lot of hosts from flooding the scheduler
1259
 
            // with requests at the same time of day.
1260
 
            //
1261
 
            time_t t = reply.host.rpc_time;
1262
 
            rpc_time_tm = localtime(&t);
1263
 
            delay_time  = (23 - rpc_time_tm->tm_hour) * 3600
1264
 
                + (59 - rpc_time_tm->tm_min) * 60
1265
 
                + (60 - rpc_time_tm->tm_sec)
1266
 
                + (int)(3600*(double)rand()/(double)RAND_MAX);
1267
 
            reply.set_delay(delay_time);
1268
 
        }
1269
 
        if (reply.wreq.cache_size_exceeded) {
1270
 
            sprintf(helpful, "(reached per-CPU limit of %d tasks)",
1271
 
                config.max_wus_in_progress
1272
 
            );
1273
 
            USER_MESSAGE um(helpful, "high");
1274
 
            reply.insert_message(um);
1275
 
            reply.set_delay(DELAY_NO_WORK_CACHE);
1276
 
            log_messages.printf(MSG_NORMAL,
1277
 
                "host %d already has %d result(s) in progress\n",
1278
 
                reply.host.id, reply.wreq.nresults_on_host
1279
 
            );
1280
 
        }        
1281
 
    }
1282
 
}
1283
 
 
1284
 
static void get_running_frac(SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply) {
1285
 
    if (sreq.core_client_version<=419) {
1286
 
        reply.wreq.running_frac = reply.host.on_frac;
 
1430
                sprintf(buf,
 
1431
                    "Tasks won't finish in time: Computer available %.1f%% of the time",
 
1432
                    100*g_reply->host.on_frac
 
1433
                );
 
1434
            }
 
1435
            g_reply->insert_message(buf, "low");
 
1436
        }
 
1437
        if (g_wreq->hr_reject_temp) {
 
1438
            g_reply->insert_message(
 
1439
                "Tasks are committed to other platforms",
 
1440
                "low"
 
1441
            );
 
1442
        }
 
1443
        if (g_wreq->hr_reject_perm) {
 
1444
            g_reply->insert_message(
 
1445
                _("Your computer type is not supported by this project"),
 
1446
                "notice"
 
1447
            );
 
1448
        }
 
1449
        if (g_wreq->outdated_client) {
 
1450
            g_reply->insert_message(
 
1451
                _("Newer BOINC version required; please install current version"),
 
1452
                "notice"
 
1453
            );
 
1454
            g_reply->set_delay(DELAY_NO_WORK_PERM);
 
1455
            log_messages.printf(MSG_NORMAL,
 
1456
                "Not sending work because newer client version required\n"
 
1457
            );
 
1458
        }
 
1459
        if (g_wreq->no_cuda_prefs) {
 
1460
            g_reply->insert_message(
 
1461
                _("Tasks for NVIDIA GPU are available, but your preferences are set to not accept them"),
 
1462
                "notice"
 
1463
            );
 
1464
        }
 
1465
        if (g_wreq->no_ati_prefs) {
 
1466
            g_reply->insert_message(
 
1467
                _("Tasks for ATI GPU are available, but your preferences are set to not accept them"),
 
1468
                "notice"
 
1469
            );
 
1470
        }
 
1471
        if (g_wreq->no_cpu_prefs) {
 
1472
            g_reply->insert_message(
 
1473
                _("Tasks for CPU are available, but your preferences are set to not accept them"),
 
1474
                "notice"
 
1475
            );
 
1476
        }
 
1477
        DB_HOST_APP_VERSION* havp = quota_exceeded_version();
 
1478
        if (havp) {
 
1479
            sprintf(buf, "This computer has finished a daily quota of %d tasks)",
 
1480
                havp->max_jobs_per_day
 
1481
            );
 
1482
            g_reply->insert_message(buf, "low");
 
1483
            if (config.debug_quota) {
 
1484
                log_messages.printf(MSG_NORMAL,
 
1485
                    "[quota] Daily quota %d exceeded for app version %d\n",
 
1486
                    havp->max_jobs_per_day, havp->app_version_id
 
1487
                );
 
1488
            }
 
1489
            g_reply->set_delay(DELAY_NO_WORK_CACHE);
 
1490
        }
 
1491
        if (g_wreq->max_jobs_on_host_exceeded
 
1492
            || g_wreq->max_jobs_on_host_cpu_exceeded
 
1493
            || g_wreq->max_jobs_on_host_gpu_exceeded
 
1494
        ) {
 
1495
            sprintf(buf, "This computer has reached a limit on tasks in progress");
 
1496
            g_reply->insert_message(buf, "low");
 
1497
            g_reply->set_delay(DELAY_NO_WORK_CACHE);
 
1498
        }
 
1499
    }
 
1500
}
 
1501
 
 
1502
static double clamp_req_sec(double x) {
 
1503
    if (x < MIN_REQ_SECS) return MIN_REQ_SECS;
 
1504
    if (x > MAX_REQ_SECS) return MAX_REQ_SECS;
 
1505
    return x;
 
1506
}
 
1507
 
 
1508
// prepare to send jobs, both resent and new;
 
1509
// decipher request type, fill in WORK_REQ
 
1510
//
 
1511
void send_work_setup() {
 
1512
    unsigned int i;
 
1513
 
 
1514
    g_wreq->seconds_to_fill = clamp_req_sec(g_request->work_req_seconds);
 
1515
    g_wreq->cpu_req_secs = clamp_req_sec(g_request->cpu_req_secs);
 
1516
    g_wreq->cpu_req_instances = g_request->cpu_req_instances;
 
1517
    g_wreq->anonymous_platform = is_anonymous(g_request->platforms.list[0]);
 
1518
 
 
1519
    if (g_wreq->anonymous_platform) {
 
1520
        estimate_flops_anon_platform();
 
1521
    }
 
1522
    cuda_requirements.clear();
 
1523
    ati_requirements.clear();
 
1524
 
 
1525
    g_wreq->disk_available = max_allowable_disk();
 
1526
    get_mem_sizes();
 
1527
    get_running_frac();
 
1528
    g_wreq->get_job_limits();
 
1529
 
 
1530
    if (g_request->coprocs.cuda.count) {
 
1531
        g_wreq->cuda_req_secs = clamp_req_sec(g_request->coprocs.cuda.req_secs);
 
1532
        g_wreq->cuda_req_instances = g_request->coprocs.cuda.req_instances;
 
1533
        if (g_request->coprocs.cuda.estimated_delay < 0) {
 
1534
            g_request->coprocs.cuda.estimated_delay = g_request->cpu_estimated_delay;
 
1535
        }
 
1536
    }
 
1537
    if (g_request->coprocs.ati.count) {
 
1538
        g_wreq->ati_req_secs = clamp_req_sec(g_request->coprocs.ati.req_secs);
 
1539
        g_wreq->ati_req_instances = g_request->coprocs.ati.req_instances;
 
1540
        if (g_request->coprocs.ati.estimated_delay < 0) {
 
1541
            g_request->coprocs.ati.estimated_delay = g_request->cpu_estimated_delay;
 
1542
        }
 
1543
    }
 
1544
    if (g_wreq->cpu_req_secs || g_wreq->cuda_req_secs || g_wreq->ati_req_secs) {
 
1545
        g_wreq->rsc_spec_request = true;
1287
1546
    } else {
1288
 
        reply.wreq.running_frac = reply.host.active_frac * reply.host.on_frac;
1289
 
    }
1290
 
    if (reply.wreq.running_frac < HOST_ACTIVE_FRAC_MIN) {
1291
 
        reply.wreq.running_frac = HOST_ACTIVE_FRAC_MIN;
1292
 
    }
1293
 
    if (reply.wreq.running_frac > 1) reply.wreq.running_frac = 1;
1294
 
}
1295
 
 
1296
 
static void send_work_old(SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply) {
1297
 
    reply.wreq.beta_only = false;
1298
 
    reply.wreq.user_apps_only = true;
1299
 
 
1300
 
    // give top priority to results that require a 'reliable host'
1301
 
    //
1302
 
    if (reply.wreq.host_info.reliable) {
1303
 
        reply.wreq.reliable_only = true;
1304
 
        reply.wreq.infeasible_only = false;
1305
 
        scan_work_array(sreq, reply);
1306
 
    }
1307
 
    reply.wreq.reliable_only = false;
1308
 
 
1309
 
    // give 2nd priority to results for a beta app
1310
 
    // (projects should load beta work with care,
1311
 
    // otherwise your users won't get production work done!
1312
 
    //
1313
 
    if (reply.wreq.host_info.allow_beta_work) {
1314
 
        reply.wreq.beta_only = true;
1315
 
        if (config.debug_send) {
1316
 
            log_messages.printf(MSG_DEBUG,
1317
 
                "[HOST#%d] will accept beta work.  Scanning for beta work.\n",
1318
 
                reply.host.id
1319
 
            );
1320
 
        }
1321
 
        scan_work_array(sreq, reply);
1322
 
    }
1323
 
    reply.wreq.beta_only = false;
1324
 
 
1325
 
    // give next priority to results that were infeasible for some other host
1326
 
    //
1327
 
    reply.wreq.infeasible_only = true;
1328
 
    scan_work_array(sreq, reply);
1329
 
 
1330
 
    reply.wreq.infeasible_only = false;
1331
 
    scan_work_array(sreq, reply);
1332
 
    
1333
 
    // If user has selected apps but will accept any,
1334
 
    // and we haven't found any jobs for selected apps, try others
1335
 
    //
1336
 
    if (!reply.wreq.nresults && reply.wreq.host_info.allow_non_preferred_apps ) {
1337
 
        reply.wreq.user_apps_only = false;
1338
 
        preferred_app_message_index = reply.wreq.no_work_messages.size();
1339
 
        if (config.debug_send) {
1340
 
            log_messages.printf(MSG_DEBUG,
1341
 
                "[HOST#%d] is looking for work from a non-preferred application\n",
1342
 
                reply.host.id
1343
 
            );
1344
 
        }
1345
 
        scan_work_array(sreq, reply);
1346
 
    }
1347
 
}
1348
 
 
1349
 
#define ER_MAX  0.05
1350
 
// decide whether to unreplicated jobs to this host
1351
 
//
1352
 
void set_trust(SCHEDULER_REPLY& reply) {
1353
 
    reply.wreq.trust = false;
1354
 
    if (reply.host.error_rate > ER_MAX) {
1355
 
        if (config.debug_send) {
1356
 
            log_messages.printf(MSG_DEBUG,
1357
 
                "set_trust: error rate %f > %f, don't trust\n",
1358
 
                reply.host.error_rate, ER_MAX
1359
 
            );
1360
 
        }
1361
 
        return;
1362
 
    }
1363
 
    double x = sqrt(reply.host.error_rate/ER_MAX);
1364
 
    if (drand() > x) reply.wreq.trust = true;
1365
 
    if (config.debug_send) {
1366
 
        log_messages.printf(MSG_DEBUG,
1367
 
            "set_trust: random choice for error rate %f: %s\n",
1368
 
            reply.host.error_rate, reply.wreq.trust?"yes":"no"
1369
 
        );
1370
 
    }
1371
 
}
1372
 
 
1373
 
void send_work(SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply) {
1374
 
    if (sreq.work_req_seconds <= 0) return;
1375
 
 
1376
 
    reply.wreq.disk_available = max_allowable_disk(sreq, reply);
1377
 
 
1378
 
    if (hr_unknown_platform(sreq.host)) {
1379
 
        reply.wreq.hr_reject_perm = true;
1380
 
        return;
1381
 
    }
1382
 
 
1383
 
    get_host_info(reply); // parse project prefs for app details
1384
 
 
1385
 
    set_trust(reply);
1386
 
 
1387
 
    get_running_frac(sreq, reply);
1388
 
 
1389
 
    if (config.debug_send) {
1390
 
        log_messages.printf(MSG_DEBUG,
1391
 
            "%s matchmaker scheduling; %s EDF sim\n",
 
1547
        g_wreq->rsc_spec_request = false;
 
1548
    }
 
1549
 
 
1550
    for (i=0; i<g_request->other_results.size(); i++) {
 
1551
        OTHER_RESULT& r = g_request->other_results[i];
 
1552
        APP* app = NULL;
 
1553
        bool uses_gpu = false;
 
1554
        bool have_cav = false;
 
1555
        if (r.app_version >= 0
 
1556
            && r.app_version < (int)g_request->client_app_versions.size()
 
1557
        ) {
 
1558
            CLIENT_APP_VERSION& cav = g_request->client_app_versions[r.app_version];
 
1559
            app = cav.app;
 
1560
            if (app) {
 
1561
                have_cav = true;
 
1562
                uses_gpu = cav.host_usage.uses_gpu();
 
1563
            }
 
1564
        }
 
1565
        if (!have_cav) {
 
1566
            if (r.have_plan_class && app_plan_uses_gpu(r.plan_class)) {
 
1567
                uses_gpu = true;
 
1568
            }
 
1569
        }
 
1570
        config.max_jobs_in_progress.register_job(app, uses_gpu);
 
1571
    }
 
1572
 
 
1573
    // print details of request to log
 
1574
    //
 
1575
    if (config.debug_send) {
 
1576
        log_messages.printf(MSG_NORMAL,
 
1577
            "[send] %s matchmaker scheduling; %s EDF sim\n",
1392
1578
            config.matchmaker?"Using":"Not using",
1393
1579
            config.workload_sim?"Using":"Not using"
1394
1580
        );
1395
 
        log_messages.printf(MSG_DEBUG,
1396
 
            "available disk %f GB, work_buf_min %d\n",
1397
 
            reply.wreq.disk_available/GIGA,
1398
 
            (int)sreq.global_prefs.work_buf_min()
1399
 
        );
1400
 
        log_messages.printf(MSG_DEBUG,
1401
 
            "running frac %f DCF %f est delay %d\n",
1402
 
            reply.wreq.running_frac,
1403
 
            reply.host.duration_correction_factor,
1404
 
            (int)sreq.estimated_delay
1405
 
        );
1406
 
    }
1407
 
 
1408
 
    reply.wreq.seconds_to_fill = sreq.work_req_seconds;
1409
 
    if (reply.wreq.seconds_to_fill > MAX_SECONDS_TO_SEND) {
1410
 
        reply.wreq.seconds_to_fill = MAX_SECONDS_TO_SEND;
1411
 
    }
1412
 
    if (reply.wreq.seconds_to_fill < MIN_SECONDS_TO_SEND) {
1413
 
        reply.wreq.seconds_to_fill = MIN_SECONDS_TO_SEND;
1414
 
    }
 
1581
        log_messages.printf(MSG_NORMAL,
 
1582
            "[send] CPU: req %.2f sec, %.2f instances; est delay %.2f\n",
 
1583
            g_wreq->cpu_req_secs, g_wreq->cpu_req_instances,
 
1584
            g_request->cpu_estimated_delay
 
1585
        );
 
1586
        if (g_request->coprocs.cuda.count) {
 
1587
            log_messages.printf(MSG_NORMAL,
 
1588
                "[send] CUDA: req %.2f sec, %.2f instances; est delay %.2f\n",
 
1589
                g_wreq->cuda_req_secs, g_wreq->cuda_req_instances,
 
1590
                g_request->coprocs.cuda.estimated_delay
 
1591
            );
 
1592
        }
 
1593
        if (g_request->coprocs.ati.count) {
 
1594
            log_messages.printf(MSG_NORMAL,
 
1595
                "[send] ATI: req %.2f sec, %.2f instances; est delay %.2f\n",
 
1596
                g_wreq->ati_req_secs, g_wreq->ati_req_instances,
 
1597
                g_request->coprocs.ati.estimated_delay
 
1598
            );
 
1599
        }
 
1600
        log_messages.printf(MSG_NORMAL,
 
1601
            "[send] work_req_seconds: %.2f secs\n",
 
1602
            g_wreq->seconds_to_fill
 
1603
        );
 
1604
        log_messages.printf(MSG_NORMAL,
 
1605
            "[send] available disk %.2f GB, work_buf_min %d\n",
 
1606
            g_wreq->disk_available/GIGA,
 
1607
            (int)g_request->global_prefs.work_buf_min()
 
1608
        );
 
1609
        log_messages.printf(MSG_NORMAL,
 
1610
            "[send] active_frac %f on_frac %f\n",
 
1611
            g_reply->host.active_frac,
 
1612
            g_reply->host.on_frac
 
1613
        );
 
1614
        if (g_wreq->anonymous_platform) {
 
1615
            log_messages.printf(MSG_NORMAL,
 
1616
                "Anonymous platform app versions:\n"
 
1617
            );
 
1618
            for (i=0; i<g_request->client_app_versions.size(); i++) {
 
1619
                CLIENT_APP_VERSION& cav = g_request->client_app_versions[i];
 
1620
                log_messages.printf(MSG_NORMAL,
 
1621
                    "   app: %s version %d cpus %.2f cudas %.2f atis %.2f flops %fG\n",
 
1622
                    cav.app_name,
 
1623
                    cav.version_num,
 
1624
                    cav.host_usage.avg_ncpus,
 
1625
                    cav.host_usage.ncudas,
 
1626
                    cav.host_usage.natis,
 
1627
                    cav.host_usage.projected_flops/1e9
 
1628
                );
 
1629
            }
 
1630
        }
 
1631
    }
 
1632
}
 
1633
 
 
1634
// If a record is not in DB, create it.
 
1635
//
 
1636
int update_host_app_versions(vector<RESULT>& results, int hostid) {
 
1637
    vector<DB_HOST_APP_VERSION> new_havs;
 
1638
    unsigned int i, j;
 
1639
    int retval;
 
1640
 
 
1641
    for (i=0; i<results.size(); i++) {
 
1642
        RESULT& r = results[i];
 
1643
        int gavid = generalized_app_version_id(r.app_version_id, r.appid);
 
1644
        DB_HOST_APP_VERSION* havp = gavid_to_havp(gavid);
 
1645
        if (!havp) {
 
1646
            bool found = false;
 
1647
            for (j=0; j<new_havs.size(); j++) {
 
1648
                DB_HOST_APP_VERSION& hav = new_havs[j];
 
1649
                if (hav.app_version_id == gavid) {
 
1650
                    found = true;
 
1651
                }
 
1652
            }
 
1653
            if (!found) {
 
1654
                DB_HOST_APP_VERSION hav;
 
1655
                hav.clear();
 
1656
                hav.host_id = hostid;
 
1657
                hav.app_version_id = gavid;
 
1658
                new_havs.push_back(hav);
 
1659
            }
 
1660
        }
 
1661
    }
 
1662
 
 
1663
    // create new records
 
1664
    //
 
1665
    for (i=0; i<new_havs.size(); i++) {
 
1666
        DB_HOST_APP_VERSION& hav = new_havs[i];
 
1667
 
 
1668
        retval = hav.insert();
 
1669
        if (retval) {
 
1670
            log_messages.printf(MSG_CRITICAL,
 
1671
                "hav.insert(): %d\n", retval
 
1672
            );
 
1673
        } else {
 
1674
            if (config.debug_credit) {
 
1675
                log_messages.printf(MSG_NORMAL,
 
1676
                    "[credit] created host_app_version record (%d, %d)\n",
 
1677
                    hav.host_id, hav.app_version_id
 
1678
                );
 
1679
            }
 
1680
        }
 
1681
    }
 
1682
    return 0;
 
1683
}
 
1684
 
 
1685
void send_work() {
 
1686
    int retval;
 
1687
 
 
1688
    if (!work_needed(false)) {
 
1689
        send_user_messages();
 
1690
        return;
 
1691
    }
 
1692
    g_wreq->no_jobs_available = true;
 
1693
 
 
1694
    if (!g_wreq->rsc_spec_request && g_wreq->seconds_to_fill == 0) {
 
1695
        return;
 
1696
    }
 
1697
 
 
1698
    if (all_apps_use_hr && hr_unknown_platform(g_request->host)) {
 
1699
        log_messages.printf(MSG_NORMAL,
 
1700
            "Not sending work because unknown HR class\n"
 
1701
        );
 
1702
        g_wreq->hr_reject_perm = true;
 
1703
        return;
 
1704
    }
 
1705
 
 
1706
    // decide on attributes of HOST_APP_VERSIONS
 
1707
    //
 
1708
    get_reliability_and_trust();
 
1709
 
 
1710
    get_prefs_info();
1415
1711
 
1416
1712
    if (config.enable_assignment) {
1417
 
        if (send_assigned_jobs(sreq, reply)) {
 
1713
        if (send_assigned_jobs()) {
1418
1714
            if (config.debug_assignment) {
1419
 
                log_messages.printf(MSG_DEBUG,
1420
 
                    "[HOST#%d] sent assigned jobs\n", reply.host.id
 
1715
                log_messages.printf(MSG_NORMAL,
 
1716
                    "[assign] [HOST#%d] sent assigned jobs\n", g_reply->host.id
1421
1717
                );
1422
1718
            }
1423
 
            return;
 
1719
            goto done;
1424
1720
        }
1425
1721
    }
1426
1722
 
1427
 
    if (config.workload_sim && sreq.have_other_results_list) {
 
1723
    if (config.workload_sim && g_request->have_other_results_list) {
1428
1724
        init_ip_results(
1429
 
            sreq.global_prefs.work_buf_min(), effective_ncpus(sreq, reply), sreq.ip_results
 
1725
            g_request->global_prefs.work_buf_min(),
 
1726
            g_wreq->effective_ncpus, g_request->ip_results
1430
1727
        );
1431
1728
    }
1432
1729
 
1433
 
    if (config.locality_scheduling) {
1434
 
        reply.wreq.infeasible_only = false;
1435
 
        send_work_locality(sreq, reply);
 
1730
    if (config.locality_scheduler_fraction > 0) {
 
1731
        if (drand() < config.locality_scheduler_fraction) {
 
1732
            if (config.debug_locality) {
 
1733
                log_messages.printf(MSG_NORMAL,
 
1734
                    "[mixed] sending locality work first\n"
 
1735
                );
 
1736
            }
 
1737
            send_work_locality();
 
1738
            if (config.debug_locality) {
 
1739
                log_messages.printf(MSG_NORMAL,
 
1740
                    "[mixed] sending non-locality work second\n"
 
1741
                );
 
1742
            }
 
1743
            send_work_old();
 
1744
        } else {
 
1745
            if (config.debug_locality) {
 
1746
                log_messages.printf(MSG_NORMAL,
 
1747
                    "[mixed] sending non-locality work first\n"
 
1748
                );
 
1749
            }
 
1750
            send_work_old();
 
1751
            if (config.debug_locality) {
 
1752
                log_messages.printf(MSG_NORMAL,
 
1753
                    "[mixed] sending locality work second\n"
 
1754
                );
 
1755
            }
 
1756
            send_work_locality();
 
1757
        }
 
1758
    } else if (config.locality_scheduling) {
 
1759
        send_work_locality();
1436
1760
    } else if (config.matchmaker) {
1437
 
        send_work_matchmaker(sreq, reply);
1438
 
    } else {
1439
 
        send_work_old(sreq, reply);
1440
 
    }
1441
 
 
1442
 
    explain_to_user(sreq, reply);
1443
 
}
1444
 
 
1445
 
// Matchmaker scheduling code follows
1446
 
 
1447
 
struct JOB {
1448
 
    int index;
1449
 
    double score;
1450
 
    double est_time;
1451
 
    double disk_usage;
1452
 
    APP* app;
1453
 
    BEST_APP_VERSION* bavp;
1454
 
 
1455
 
    bool get_score(SCHEDULER_REQUEST&, SCHEDULER_REPLY&);
1456
 
};
1457
 
 
1458
 
struct JOB_SET {
1459
 
    double work_req;
1460
 
    double est_time;
1461
 
    double disk_usage;
1462
 
    double disk_limit;
1463
 
    int max_jobs;
1464
 
    std::list<JOB> jobs;     // sorted high to low
1465
 
 
1466
 
    JOB_SET(SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply) {
1467
 
        work_req = sreq.work_req_seconds;
1468
 
        est_time = 0;
1469
 
        disk_usage = 0;
1470
 
        disk_limit = reply.wreq.disk_available;
1471
 
        max_jobs = config.max_wus_to_send;
1472
 
        int ncpus = effective_ncpus(sreq, reply), n;
1473
 
 
1474
 
        if (config.daily_result_quota) {
1475
 
            if (reply.host.max_results_day == 0 || reply.host.max_results_day>config.daily_result_quota) {
1476
 
                reply.host.max_results_day = config.daily_result_quota;
1477
 
            }
1478
 
            reply.wreq.daily_result_quota = ncpus*reply.host.max_results_day;
1479
 
            n = reply.wreq.daily_result_quota - reply.host.nresults_today;
1480
 
            if (n < 0) n = 0;
1481
 
            if (n < max_jobs) max_jobs = n;
1482
 
        }
1483
 
 
1484
 
        if (config.max_wus_in_progress) {
1485
 
            n = config.max_wus_in_progress*ncpus - reply.wreq.nresults_on_host;
1486
 
            if (n < 0) n = 0;
1487
 
            if (n < max_jobs) max_jobs = n;
1488
 
        }
1489
 
    }
1490
 
    void add_job(JOB&);
1491
 
    double higher_score_disk_usage(double);
1492
 
    double lowest_score();
1493
 
    inline bool request_satisfied() {
1494
 
        return est_time >= work_req;
1495
 
    }
1496
 
    void send(SCHEDULER_REQUEST&, SCHEDULER_REPLY&);
1497
 
};
1498
 
 
1499
 
// reread result from DB, make sure it's still unsent
1500
 
// TODO: from here to add_result_to_reply()
1501
 
// (which updates the DB record) should be a transaction
1502
 
//
1503
 
int read_sendable_result(DB_RESULT& result) {
1504
 
    int retval = result.lookup_id(result.id);
1505
 
    if (retval) {
1506
 
        log_messages.printf(MSG_CRITICAL,
1507
 
            "[RESULT#%d] result.lookup_id() failed %d\n",
1508
 
            result.id, retval
1509
 
        );
1510
 
        return ERR_NOT_FOUND;
1511
 
    }
1512
 
    if (result.server_state != RESULT_SERVER_STATE_UNSENT) {
1513
 
        log_messages.printf(MSG_NORMAL,
1514
 
            "[RESULT#%d] expected to be unsent; instead, state is %d\n",
1515
 
            result.id, result.server_state
1516
 
        );
1517
 
        return ERR_BAD_RESULT_STATE;
1518
 
    }
1519
 
    return 0;
1520
 
}
1521
 
 
1522
 
// compute a "score" for sending this job to this host.
1523
 
// Return false if the WU is infeasible.
1524
 
// Otherwise set est_time and disk_usage.
1525
 
//
1526
 
bool JOB::get_score(SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply) {
1527
 
    WORKUNIT wu;
1528
 
    int retval;
1529
 
 
1530
 
    WU_RESULT& wu_result = ssp->wu_results[index];
1531
 
    wu = wu_result.workunit;
1532
 
    app = ssp->lookup_app(wu.appid);
1533
 
 
1534
 
    score = 0;
1535
 
 
1536
 
    // Find the app_version for the client's platform.
1537
 
    //
1538
 
    bavp = get_app_version(sreq, reply, wu);
1539
 
    if (!bavp) return false;
1540
 
 
1541
 
    retval = wu_is_infeasible_fast(wu, sreq, reply, *app);
1542
 
    if (retval) {
1543
 
        if (config.debug_send) {
1544
 
            log_messages.printf(MSG_DEBUG,
1545
 
                "[HOST#%d] [WU#%d %s] WU is infeasible: %s\n",
1546
 
                reply.host.id, wu.id, wu.name, infeasible_string(retval)
1547
 
            );
1548
 
        }
1549
 
        return false;
1550
 
    }
1551
 
 
1552
 
    score = 1;
1553
 
 
1554
 
    // check if user has selected apps,
1555
 
    // and send beta work to beta users
1556
 
    //
1557
 
    if (app->beta && !config.distinct_beta_apps) {
1558
 
        if (reply.wreq.host_info.allow_beta_work) {
1559
 
            score += 1;
1560
 
        } else {
1561
 
            return false;
1562
 
        }
1563
 
    } else {
1564
 
        if (app_not_selected(wu, sreq, reply)) {
1565
 
            if (!reply.wreq.host_info.allow_non_preferred_apps) {
1566
 
                return false;
1567
 
            } else {
1568
 
            // Allow work to be sent, but it will not get a bump in its score
1569
 
            }
1570
 
        } else {
1571
 
            score += 1;
1572
 
        }
1573
 
    }
1574
 
            
1575
 
    // if job needs to get done fast, send to fast/reliable host
1576
 
    //
1577
 
    if (reply.wreq.host_info.reliable && (wu_result.need_reliable)) {
1578
 
        score += 1;
1579
 
    }
1580
 
    
1581
 
    // if job already committed to an HR class,
1582
 
    // try to send to host in that class
1583
 
    //
1584
 
    if (wu_result.infeasible_count) {
1585
 
        score += 1;
1586
 
    }
1587
 
 
1588
 
    // Favor jobs that will run fast
1589
 
    //
1590
 
    score += bavp->host_usage.flops/1e9;
1591
 
 
1592
 
    // match large jobs to fast hosts
1593
 
    //
1594
 
    if (config.job_size_matching) {
1595
 
        double host_stdev = (reply.host.p_fpops - ssp->perf_info.host_fpops_mean)/ ssp->perf_info.host_fpops_stdev;
1596
 
        double diff = host_stdev - wu_result.fpops_size;
1597
 
        score -= diff*diff;
1598
 
    }
1599
 
 
1600
 
    // TODO: If user has selected some apps but will accept jobs from others,
1601
 
    // try to send them jobs from the selected apps
1602
 
    //
1603
 
 
1604
 
    est_time = estimate_wallclock_duration(wu, sreq, reply);
1605
 
    disk_usage = wu.rsc_disk_bound;
1606
 
    return true;
1607
 
}
1608
 
 
1609
 
bool wu_is_infeasible_slow(
1610
 
    WU_RESULT& wu_result, SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply
1611
 
) {
1612
 
    char buf[256];
1613
 
    int retval;
1614
 
    int n;
1615
 
    DB_RESULT result;
1616
 
 
1617
 
    // Don't send if we've already sent a result of this WU to this user.
1618
 
    //
1619
 
    if (config.one_result_per_user_per_wu) {
1620
 
        sprintf(buf,
1621
 
            "where workunitid=%d and userid=%d",
1622
 
            wu_result.workunit.id, reply.user.id
1623
 
        );
1624
 
        retval = result.count(n, buf);
1625
 
        if (retval) {
1626
 
            log_messages.printf(MSG_CRITICAL,
1627
 
                "send_work: can't get result count (%d)\n", retval
1628
 
            );
1629
 
            return true;
1630
 
        } else {
1631
 
            if (n>0) {
1632
 
                if (config.debug_send) {
1633
 
                    log_messages.printf(MSG_DEBUG,
1634
 
                        "send_work: user %d already has %d result(s) for WU %d\n",
1635
 
                        reply.user.id, n, wu_result.workunit.id
1636
 
                    );
1637
 
                }
1638
 
                return true;
1639
 
            }
1640
 
        }
1641
 
    } else if (config.one_result_per_host_per_wu) {
1642
 
        // Don't send if we've already sent a result
1643
 
        // of this WU to this host.
1644
 
        // We only have to check this
1645
 
        // if we don't send one result per user.
1646
 
        //
1647
 
        sprintf(buf,
1648
 
            "where workunitid=%d and hostid=%d",
1649
 
            wu_result.workunit.id, reply.host.id
1650
 
        );
1651
 
        retval = result.count(n, buf);
1652
 
        if (retval) {
1653
 
            log_messages.printf(MSG_CRITICAL,
1654
 
                "send_work: can't get result count (%d)\n", retval
1655
 
            );
1656
 
            return true;
1657
 
        } else {
1658
 
            if (n>0) {
1659
 
                if (config.debug_send) {
1660
 
                    log_messages.printf(MSG_DEBUG,
1661
 
                        "send_work: host %d already has %d result(s) for WU %d\n",
1662
 
                        reply.host.id, n, wu_result.workunit.id
1663
 
                    );
1664
 
                }
1665
 
                return true;
1666
 
            }
1667
 
        }
1668
 
    }
1669
 
 
1670
 
    APP* app = ssp->lookup_app(wu_result.workunit.appid);
1671
 
    WORKUNIT wu = wu_result.workunit;
1672
 
    if (app_hr_type(*app)) {
1673
 
        if (already_sent_to_different_platform_careful(
1674
 
            sreq, reply.wreq, wu, *app
1675
 
        )) {
1676
 
            if (config.debug_send) {
1677
 
                log_messages.printf(MSG_DEBUG,
1678
 
                    "[HOST#%d] [WU#%d %s] WU is infeasible (assigned to different platform)\n",
1679
 
                    reply.host.id, wu.id, wu.name
1680
 
                );
1681
 
            }
1682
 
            // Mark the workunit as infeasible.
1683
 
            // This ensures that jobs already assigned to a platform
1684
 
            // are processed first.
1685
 
            //
1686
 
            wu_result.infeasible_count++;
1687
 
            return true;
1688
 
        }
1689
 
    }
1690
 
    return false;
1691
 
}
1692
 
 
1693
 
double JOB_SET::lowest_score() {
1694
 
    if (jobs.empty()) return 0;
1695
 
    return jobs.back().score;
1696
 
}
1697
 
 
1698
 
// add the given job, and remove lowest-score jobs that
1699
 
// - are in excess of work request
1700
 
// - are in excess of per-request or per-day limits
1701
 
// - cause the disk limit to be exceeded
1702
 
//
1703
 
void JOB_SET::add_job(JOB& job) {
1704
 
    while (!jobs.empty()) {
1705
 
        JOB& worst_job = jobs.back();
1706
 
        if (est_time + job.est_time - worst_job.est_time > work_req) {
1707
 
            est_time -= worst_job.est_time;
1708
 
            disk_usage -= worst_job.disk_usage;
1709
 
            jobs.pop_back();
1710
 
            ssp->wu_results[worst_job.index].state = WR_STATE_PRESENT;
1711
 
        } else {
1712
 
            break;
1713
 
        }
1714
 
    }
1715
 
    while (!jobs.empty()) {
1716
 
        JOB& worst_job = jobs.back();
1717
 
        if (disk_usage + job.disk_usage > disk_limit) {
1718
 
            est_time -= worst_job.est_time;
1719
 
            disk_usage -= worst_job.disk_usage;
1720
 
            jobs.pop_back();
1721
 
            ssp->wu_results[worst_job.index].state = WR_STATE_PRESENT;
1722
 
        } else {
1723
 
            break;
1724
 
        }
1725
 
    }
1726
 
 
1727
 
    if (jobs.size() == max_jobs) {
1728
 
        JOB& worst_job = jobs.back();
1729
 
        jobs.pop_back();
1730
 
        ssp->wu_results[worst_job.index].state = WR_STATE_PRESENT;
1731
 
    }
1732
 
 
1733
 
    list<JOB>::iterator i = jobs.begin();
1734
 
    while (i != jobs.end()) {
1735
 
        if (i->score < job.score) {
1736
 
            jobs.insert(i, job);
1737
 
            break;
1738
 
        }
1739
 
        i++;
1740
 
    }
1741
 
    if (i == jobs.end()) {
1742
 
        jobs.push_back(job);
1743
 
    }
1744
 
    est_time += job.est_time;
1745
 
    disk_usage += job.disk_usage;
1746
 
    if (config.debug_send) {
1747
 
        log_messages.printf(MSG_DEBUG,
1748
 
            "added job to set.  est_time %f disk_usage %f\n",
1749
 
            est_time, disk_usage
1750
 
        );
1751
 
    }
1752
 
}
1753
 
 
1754
 
// return the disk usage of jobs above the given score
1755
 
//
1756
 
double JOB_SET::higher_score_disk_usage(double v) {
1757
 
    double sum = 0;
1758
 
    list<JOB>::iterator i = jobs.begin();
1759
 
    while (i != jobs.end()) {
1760
 
        if (i->score < v) break;
1761
 
        sum += i->disk_usage;
1762
 
        i++;
1763
 
    }
1764
 
    return sum;
1765
 
}
1766
 
 
1767
 
void JOB_SET::send(SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply) {
1768
 
    WORKUNIT wu;
1769
 
    DB_RESULT result;
1770
 
    int retval;
1771
 
 
1772
 
    list<JOB>::iterator i = jobs.begin();
1773
 
    while (i != jobs.end()) {
1774
 
        JOB& job = *(i++);
1775
 
        WU_RESULT wu_result = ssp->wu_results[job.index];
1776
 
        ssp->wu_results[job.index].state = WR_STATE_EMPTY;
1777
 
        wu = wu_result.workunit;
1778
 
        result.id = wu_result.resultid;
1779
 
        retval = read_sendable_result(result);
1780
 
        if (!retval) {
1781
 
            add_result_to_reply(result, wu, sreq, reply, job.bavp);
1782
 
        }
1783
 
    }
1784
 
}
1785
 
 
1786
 
void send_work_matchmaker(SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply) {
1787
 
    int i, slots_locked=0, slots_nonempty=0;
1788
 
    JOB_SET jobs (sreq, reply);
1789
 
    int min_slots = config.mm_min_slots;
1790
 
    if (!min_slots) min_slots = ssp->max_wu_results/2;
1791
 
    int max_slots = config.mm_max_slots;
1792
 
    if (!max_slots) max_slots = ssp->max_wu_results;
1793
 
    int max_locked = 10;
1794
 
 
1795
 
    lock_sema();
1796
 
    i = rand() % ssp->max_wu_results;
1797
 
 
1798
 
    // scan through the job cache, maintaining a JOB_SET of jobs
1799
 
    // that we can send to this client, ordered by score.
1800
 
    //
1801
 
    for (int slots_scanned=0; slots_scanned<max_slots; slots_scanned++) {
1802
 
        i = (i+1) % ssp->max_wu_results;
1803
 
        WU_RESULT& wu_result = ssp->wu_results[i];
1804
 
        switch (wu_result.state) {
1805
 
        case WR_STATE_EMPTY:
1806
 
            continue;
1807
 
        case WR_STATE_PRESENT:
1808
 
            slots_nonempty++;
1809
 
            break;
1810
 
        default:
1811
 
            slots_nonempty++;
1812
 
            if (wu_result.state == g_pid) break;
1813
 
            slots_locked++;
1814
 
            continue;
1815
 
        }
1816
 
 
1817
 
        JOB job;
1818
 
        job.index = i;
1819
 
 
1820
 
        // get score for this job, and skip it if it fails quick check.
1821
 
        // NOTE: the EDF check done in get_score()
1822
 
        // includes only in-progress jobs.
1823
 
        //
1824
 
        if (!job.get_score(sreq, reply)) {
1825
 
            continue;
1826
 
        }
1827
 
        if (config.debug_send) {
1828
 
            log_messages.printf(MSG_DEBUG,
1829
 
                "score for %s: %f\n", wu_result.workunit.name, job.score
1830
 
            );
1831
 
        }
1832
 
 
1833
 
        if (job.score > jobs.lowest_score() || !jobs.request_satisfied()) {
1834
 
            ssp->wu_results[i].state = g_pid;
1835
 
            unlock_sema();
1836
 
            if (wu_is_infeasible_slow(wu_result, sreq, reply)) {
1837
 
                // if we can't use this job, put it back in pool
1838
 
                //
1839
 
                lock_sema();
1840
 
                ssp->wu_results[i].state = WR_STATE_PRESENT;
1841
 
                continue;
1842
 
            }
1843
 
            lock_sema();
1844
 
            jobs.add_job(job);
1845
 
        }
1846
 
 
1847
 
        if (jobs.request_satisfied() && slots_scanned>=min_slots) break;
1848
 
    }
1849
 
 
1850
 
    if (!slots_nonempty) {
1851
 
        log_messages.printf(MSG_CRITICAL,
1852
 
            "Job cache is empty - check feeder\n"
1853
 
        );
1854
 
        reply.wreq.no_jobs_available = true;
1855
 
    }
1856
 
 
1857
 
    // TODO: trim jobs from tail of list until we pass the EDF check
1858
 
    //
1859
 
    jobs.send(sreq, reply);
1860
 
    unlock_sema();
1861
 
    if (slots_locked > max_locked) {
1862
 
        log_messages.printf(MSG_CRITICAL,
1863
 
            "Found too many locked slots (%d>%d) - increase array size",
1864
 
            slots_locked, max_locked
1865
 
        );
1866
 
    }
1867
 
}
1868
 
 
1869
 
const char *BOINC_RCSID_32dcd335e7 = "$Id: sched_send.cpp 16611 2008-12-03 20:55:22Z romw $";
 
1761
        send_work_matchmaker();
 
1762
    } else {
 
1763
        send_work_old();
 
1764
    }
 
1765
 
 
1766
done:
 
1767
    retval = update_host_app_versions(g_reply->results, g_reply->host.id);
 
1768
    if (retval) {
 
1769
        log_messages.printf(MSG_CRITICAL,
 
1770
            "update_host_app_versions() failed: %d\n", retval
 
1771
        );
 
1772
    }
 
1773
    send_user_messages();
 
1774
}
 
1775
 
 
1776
const char *BOINC_RCSID_32dcd335e7 = "$Id: sched_send.cpp 22651 2010-11-08 17:57:13Z romw $";