~ubuntu-branches/ubuntu/lucid/boinc/lucid

« back to all changes in this revision

Viewing changes to sched/sched_send.C

  • Committer: Bazaar Package Importer
  • Author(s): Frank S. Thomas, Frank S. Thomas
  • Date: 2008-05-31 08:02:47 UTC
  • mfrom: (1.1.8 upstream)
  • Revision ID: james.westby@ubuntu.com-20080531080247-4ce890lp2rc768cr
Tags: 6.2.7-1
[ Frank S. Thomas ]
* New upstream release.
  - BOINC Manager: Redraw disk usage charts immediately after connecting to
    a (different) client. (closes: 463823)
* debian/copyright:
  - Added the instructions from debian/README.Debian-source about how
    repackaged BOINC tarballs can be reproduced because DevRef now
    recommends to put this here instead of in the afore-mentioned file.
  - Updated for the new release.
* Removed the obsolete debian/README.Debian-source.
* For consistency upstream renamed the core client and the command tool
  ("boinc_client" to "boinc" and "boinc_cmd" to "boinccmd"). Done the same
  in all packages and created symlinks with the old names for the binaries
  and man pages. Also added an entry in debian/boinc-client.NEWS explaining
  this change.
* debian/rules: Do not list Makefile.ins in the clean target individually,
  just remove all that can be found.

Show diffs side-by-side

added added

removed removed

Lines of Context:
15
15
// To view the GNU Lesser General Public License visit
16
16
// http://www.gnu.org/copyleft/lesser.html
17
17
// or write to the Free Software Foundation, Inc.,
18
 
// 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19
 
 
20
 
// scheduler code related to sending work
21
 
 
 
18
// 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
 
19
 
 
20
// scheduler code related to sending jobs
22
21
 
23
22
#include "config.h"
24
23
#include <vector>
 
24
#include <list>
25
25
#include <string>
26
26
#include <ctime>
27
27
#include <cstdio>
 
28
#include <cstring>
28
29
#include <stdlib.h>
29
30
 
30
31
using namespace std;
44
45
#include "sched_array.h"
45
46
#include "sched_msgs.h"
46
47
#include "sched_hr.h"
 
48
#include "hr.h"
47
49
#include "sched_locality.h"
48
50
#include "sched_timezone.h"
 
51
#include "sched_assign.h"
 
52
#include "sched_plan.h"
49
53
 
50
54
#include "sched_send.h"
51
55
 
55
59
#define FCGI_ToFILE(x) (x)
56
60
#endif
57
61
 
 
62
//#define MATCHMAKER
 
63
 
 
64
#ifdef MATCHMAKER
 
65
void send_work_matchmaker(SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply);
 
66
#endif
 
67
 
 
68
int preferred_app_message_index=0;
 
69
 
 
70
const char* infeasible_string(int code) {
 
71
    switch (code) {
 
72
    case INFEASIBLE_MEM: return "Not enough memory";
 
73
    case INFEASIBLE_DISK: return "Not enough disk";
 
74
    case INFEASIBLE_CPU: return "CPU too slow";
 
75
    case INFEASIBLE_APP_SETTING: return "App not selected";
 
76
    case INFEASIBLE_WORKLOAD: return "Existing workload";
 
77
    case INFEASIBLE_DUP: return "Already in reply";
 
78
    case INFEASIBLE_HR: return "Homogeneous redundancy";
 
79
    case INFEASIBLE_BANDWIDTH: return "Download bandwidth too low";
 
80
    }
 
81
    return "Unknown";
 
82
}
 
83
 
58
84
const int MIN_SECONDS_TO_SEND = 0;
59
85
const int MAX_SECONDS_TO_SEND = (28*SECONDS_IN_DAY);
60
 
const int MAX_CPUS = 8;
61
 
    // max multiplier for daily_result_quota;
62
 
    // need to change as multicore processors expand
 
86
 
 
87
inline int effective_ncpus(HOST& host) {
 
88
    int ncpus = host.p_ncpus;
 
89
    if (ncpus > config.max_ncpus) ncpus = config.max_ncpus;
 
90
    if (ncpus < 1) ncpus = 1;
 
91
    return ncpus;
 
92
}
63
93
 
64
94
const double DEFAULT_RAM_SIZE = 64000000;
65
95
    // if host sends us an impossible RAM size, use this instead
80
110
    return false;
81
111
}
82
112
 
83
 
// Find an app and app_version for the client's platform(s).
84
 
//
85
 
int get_app_version(
86
 
    WORKUNIT& wu, APP* &app, APP_VERSION* &avp,
87
 
    SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, PLATFORM_LIST& platforms,
88
 
    SCHED_SHMEM& ss
 
113
// return BEST_APP_VERSION for the given host, or NULL if none
 
114
//
 
115
//
 
116
BEST_APP_VERSION* get_app_version(
 
117
    SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, WORKUNIT& wu
89
118
) {
90
119
    bool found;
91
 
    if (anonymous(platforms.list[0])) {
92
 
        app = ss.lookup_app(wu.appid);
 
120
    double flops;
 
121
    unsigned int i;
 
122
    int j;
 
123
    BEST_APP_VERSION* bavp;
 
124
 
 
125
    //
 
126
    // see if app is already in memoized array
 
127
    //
 
128
    for (i=0; i<reply.wreq.best_app_versions.size(); i++) {
 
129
        bavp = reply.wreq.best_app_versions[i];
 
130
        if (bavp->appid == wu.appid) {
 
131
            if (!bavp->avp) return NULL;
 
132
            return bavp;
 
133
        }
 
134
    }
 
135
 
 
136
    APP* app = ssp->lookup_app(wu.appid);
 
137
    if (!app) {
 
138
        log_messages.printf(MSG_CRITICAL, "WU refers to nonexistent app: %d\n", wu.appid);
 
139
        return NULL;
 
140
    }
 
141
 
 
142
    bavp = new BEST_APP_VERSION;
 
143
    bavp->appid = wu.appid;
 
144
    if (anonymous(sreq.platforms.list[0])) {
93
145
        found = sreq.has_version(*app);
94
146
        if (!found) {
95
 
            log_messages.printf(SCHED_MSG_LOG::MSG_DEBUG,
96
 
                "Didn't find anonymous app\n"
 
147
            if (config.debug_send) {
 
148
                log_messages.printf(MSG_DEBUG, "Didn't find anonymous app for %s\n", app->name);
 
149
            }
 
150
            bavp->avp = 0;
 
151
        } else {
 
152
            bavp->avp = (APP_VERSION*)1;    // arbitrary nonzero value
 
153
        }
 
154
        reply.wreq.best_app_versions.push_back(bavp);
 
155
        return bavp;
 
156
    }
 
157
 
 
158
 
 
159
    // go through the client's platforms.
 
160
    // Scan the app versions for each platform.
 
161
    // Find the one with highest expected FLOPS
 
162
    //
 
163
    bavp->host_usage.flops = 0;
 
164
    bavp->avp = NULL;
 
165
    for (i=0; i<sreq.platforms.list.size(); i++) {
 
166
        PLATFORM* p = sreq.platforms.list[i];
 
167
        for (j=0; j<ssp->napp_versions; j++) {
 
168
            HOST_USAGE host_usage;
 
169
            APP_VERSION& av = ssp->app_versions[j];
 
170
            if (av.appid != wu.appid) continue;
 
171
            if (av.platformid != p->id) continue;
 
172
            if (reply.wreq.core_client_version < av.min_core_version) {
 
173
                reply.wreq.outdated_core = true;
 
174
                continue;
 
175
            }
 
176
            if (strlen(av.plan_class) && sreq.client_cap_plan_class) {
 
177
                if (!app_plan(sreq, av.plan_class, host_usage)) {
 
178
                    continue;
 
179
                }
 
180
            } else {
 
181
                host_usage.init_seq(reply.host.p_fpops);
 
182
            }
 
183
            if (host_usage.flops > bavp->host_usage.flops) {
 
184
                bavp->host_usage = host_usage;
 
185
                bavp->avp = &av;
 
186
            }
 
187
        }
 
188
    }
 
189
    reply.wreq.best_app_versions.push_back(bavp);
 
190
    if (bavp->avp) {
 
191
        if (config.debug_version_select) {
 
192
            log_messages.printf(MSG_DEBUG,
 
193
                "Best version of app %s is %d (%.2f GFLOPS)\n",
 
194
                app->name, bavp->avp->id, bavp->host_usage.flops/1e9
97
195
            );
98
 
            return ERR_NO_APP_VERSION;
99
196
        }
100
 
        avp = NULL;
101
197
    } else {
102
 
        found = find_app_version(reply.wreq, wu, platforms, ss, app, avp);
103
 
        if (!found) {
104
 
            log_messages.printf(SCHED_MSG_LOG::MSG_DEBUG, "Didn't find app version\n");
105
 
            return ERR_NO_APP_VERSION;
106
 
        }
107
 
 
108
 
        // see if the core client is too old.
 
198
        // here if no app version exists
109
199
        //
110
 
        if (!app_core_compatible(reply.wreq, *avp)) {
111
 
            log_messages.printf(SCHED_MSG_LOG::MSG_DEBUG, "Didn't find app version: core client too old\n");
112
 
            return ERR_NO_APP_VERSION;
 
200
        if (config.debug_version_select) {
 
201
            log_messages.printf(MSG_DEBUG,
 
202
                "no app version available: APP#%d PLATFORM#%d min_version %d\n",
 
203
                app->id, sreq.platforms.list[0]->id, app->min_version
 
204
            );
113
205
        }
 
206
        char message[256];
 
207
        sprintf(message,
 
208
            "%s is not available for your type of computer.",
 
209
            app->user_friendly_name
 
210
        );
 
211
        USER_MESSAGE um(message, "high");
 
212
        reply.wreq.insert_no_work_message(um);
 
213
        reply.wreq.no_app_version = true;
 
214
        return NULL;
114
215
    }
115
 
    return 0;
116
 
}
 
216
    return bavp;
 
217
}
 
218
 
 
219
static char* find_user_friendly_name(int appid) {
 
220
        APP* app = ssp->lookup_app(appid);
 
221
        if (app) return app->user_friendly_name;
 
222
    return "deprecated application";
 
223
}
 
224
 
117
225
 
118
226
// Compute the max additional disk usage we can impose on the host.
119
227
// Depending on the client version, it can either send us
178
286
    }
179
287
 
180
288
    if (x < 0) {
181
 
        log_messages.printf(
182
 
            SCHED_MSG_LOG::MSG_NORMAL,
183
 
            "disk_max_used_gb %f disk_max_used_pct %f disk_min_free_gb %f\n",
184
 
            prefs.disk_max_used_gb, prefs.disk_max_used_pct,
185
 
            prefs.disk_min_free_gb
186
 
        );
187
 
        log_messages.printf(
188
 
            SCHED_MSG_LOG::MSG_NORMAL,
189
 
            "host.d_total %f host.d_free %f host.d_boinc_used_total %f\n",
190
 
            host.d_total, host.d_free, host.d_boinc_used_total
191
 
        );
192
 
        log_messages.printf(
193
 
            SCHED_MSG_LOG::MSG_NORMAL,
194
 
            "x1 %f x2 %f x3 %f x %f\n",
195
 
            x1, x2, x3, x
196
 
        );
 
289
        if (config.debug_send) {
 
290
            log_messages.printf(MSG_DEBUG,
 
291
                "Insufficient disk: disk_max_used_gb %f disk_max_used_pct %f disk_min_free_gb %f\n",
 
292
                prefs.disk_max_used_gb, prefs.disk_max_used_pct,
 
293
                prefs.disk_min_free_gb
 
294
            );
 
295
            log_messages.printf(MSG_DEBUG,
 
296
                "Insufficient disk: host.d_total %f host.d_free %f host.d_boinc_used_total %f\n",
 
297
                host.d_total, host.d_free, host.d_boinc_used_total
 
298
            );
 
299
            log_messages.printf(MSG_DEBUG,
 
300
                "Insufficient disk: x1 %f x2 %f x3 %f x %f\n",
 
301
                x1, x2, x3, x
 
302
            );
 
303
        }
 
304
        reply.wreq.disk.set_insufficient(-x);
197
305
    }
198
306
    return x;
199
307
}
222
330
static double estimate_wallclock_duration(
223
331
    WORKUNIT& wu, SCHEDULER_REQUEST& request, SCHEDULER_REPLY& reply
224
332
) {
225
 
    double running_frac;
226
 
    if (reply.wreq.core_client_version<=419) {
227
 
        running_frac = reply.host.on_frac;
228
 
    } else {
229
 
        running_frac = reply.host.active_frac * reply.host.on_frac;
230
 
    }
231
 
    if (running_frac < HOST_ACTIVE_FRAC_MIN) {
232
 
        running_frac = HOST_ACTIVE_FRAC_MIN;
233
 
    }
234
 
    if (running_frac > 1) running_frac = 1;
235
333
    double ecd = estimate_cpu_duration(wu, reply);
236
 
    double ewd = ecd/running_frac;
 
334
    double ewd = ecd/reply.wreq.running_frac;
237
335
    if (reply.host.duration_correction_factor) {
238
336
        ewd *= reply.host.duration_correction_factor;
239
337
    }
240
338
    if (reply.host.cpu_efficiency) {
241
339
        ewd /= reply.host.cpu_efficiency;
242
340
    }
243
 
    log_messages.printf(
244
 
        SCHED_MSG_LOG::MSG_DEBUG,
245
 
        "est cpu dur %f; running_frac %f; rsf %f; est %f\n",
246
 
        ecd, running_frac, request.resource_share_fraction, ewd
247
 
    );
 
341
    if (config.debug_send) {
 
342
        log_messages.printf(MSG_DEBUG,
 
343
            "est cpu dur %f;  est wall dur %f\n",
 
344
            ecd, reply.wreq.running_frac, ewd
 
345
        );
 
346
    }
248
347
    return ewd;
249
348
}
250
349
 
251
 
// Find or compute various info about the host.
252
 
// These parameters affect how work is sent to the host
 
350
// Find or compute various info about the host;
 
351
// this info affects which jobs are sent to the host.
253
352
//
254
353
static int get_host_info(SCHEDULER_REPLY& reply) {
255
354
    char buf[8096];
256
 
        std::string str;
257
 
        extract_venue(reply.user.project_prefs, reply.host.venue, buf);
258
 
        str = buf;
259
 
        unsigned int pos = 0;
260
 
        int temp_int;
 
355
    string str;
 
356
    unsigned int pos = 0;
 
357
    int temp_int;
 
358
    bool flag;
 
359
 
 
360
    extract_venue(reply.user.project_prefs, reply.host.venue, buf);
 
361
    str = buf;
261
362
 
262
363
    // scan user's project prefs for elements of the form <app_id>N</app_id>,
263
364
    // indicating the apps they want to run.
264
365
    //
265
366
    reply.wreq.host_info.preferred_apps.clear();
266
 
        while (parse_int(str.substr(pos,str.length()-pos).c_str(), "<app_id>", temp_int)) {
 
367
    while (parse_int(str.substr(pos,str.length()-pos).c_str(), "<app_id>", temp_int)) {
267
368
        APP_INFO ai;
268
369
        ai.appid = temp_int;
 
370
        ai.work_available = false;
269
371
        reply.wreq.host_info.preferred_apps.push_back(ai);
270
372
 
271
 
                pos = str.find("<app_id>", pos) + 1;
 
373
        pos = str.find("<app_id>", pos) + 1;
 
374
    }
 
375
        if (parse_bool(buf,"allow_non_preferred_apps", flag)) {
 
376
            reply.wreq.host_info.allow_non_preferred_apps = flag;
 
377
    }
 
378
        if (parse_bool(buf,"allow_beta_work", flag)) {
 
379
        reply.wreq.host_info.allow_beta_work = flag;
272
380
        }
273
 
    temp_int = parse_int(buf,"<allow_beta_work>", temp_int);
274
 
    reply.wreq.host_info.allow_beta_work = temp_int;
275
381
 
276
 
    // Decide whether or not this computer is a 'reliable' computer
 
382
    // Decide whether or not this computer is 'reliable'
 
383
    // A computer is reliable if the following conditions are true
 
384
    // (for those that are set in the config file)
 
385
    // 1) The host average turnaround is less than the config
 
386
    // max average turnaround
 
387
    // 2) The host error rate is less then the config max error rate
 
388
    // 3) The host results per day is equal to the config file value
277
389
    //
278
390
    double expavg_credit = reply.host.expavg_credit;
279
391
    double expavg_time = reply.host.expavg_time;
280
392
    double avg_turnaround = reply.host.avg_turnaround;
281
393
    update_average(0, 0, CREDIT_HALF_LIFE, expavg_credit, expavg_time);
282
 
    double credit_scale, turnaround_scale;
283
 
    if (strstr(reply.host.os_name,"Windows") || strstr(reply.host.os_name,"Linux")
284
 
    ) {
285
 
        credit_scale = 1;
286
 
        turnaround_scale = 1;
 
394
 
 
395
        // Platforms other then Windows, Linux and Intel Macs need a
 
396
    // larger set of computers to be marked reliable
 
397
    //
 
398
    double multiplier = 1.0;
 
399
    if (strstr(reply.host.os_name,"Windows")
 
400
        || strstr(reply.host.os_name,"Linux")
 
401
        || (strstr(reply.host.os_name,"Darwin")
 
402
            && !(strstr(reply.host.p_vendor,"Power Macintosh"))
 
403
    )) {
 
404
        multiplier = 1.0;
287
405
    } else {
288
 
        credit_scale = .75;
289
 
        turnaround_scale = 1.25;
 
406
        multiplier = 1.8;
290
407
    }
291
408
 
292
 
    if (((expavg_credit/reply.host.p_ncpus) > config.reliable_min_avg_credit*credit_scale || config.reliable_min_avg_credit == 0)
293
 
            && (avg_turnaround < config.reliable_max_avg_turnaround*turnaround_scale || config.reliable_max_avg_turnaround == 0)
294
 
    ){
 
409
    if ((config.reliable_max_avg_turnaround == 0 || reply.host.avg_turnaround < config.reliable_max_avg_turnaround*multiplier)
 
410
        && (config.reliable_max_error_rate == 0 || reply.host.error_rate < config.reliable_max_error_rate*multiplier)
 
411
        && (config.daily_result_quota == 0 || reply.host.max_results_day >= config.daily_result_quota)
 
412
     ) {
295
413
        reply.wreq.host_info.reliable = true;
296
 
        log_messages.printf(SCHED_MSG_LOG::MSG_DEBUG,
297
 
            "[HOST#%d] is reliable (OS = %s) expavg_credit = %.0f avg_turnaround(hours) = %.0f \n",
298
 
            reply.host.id, reply.host.os_name, expavg_credit,
299
 
            avg_turnaround/3600
 
414
    }
 
415
    if (config.debug_send) {
 
416
        log_messages.printf(MSG_DEBUG,
 
417
            "[HOST#%d] is%s reliable (OS = %s) error_rate = %.3f avg_turn_hrs = %.0f \n",
 
418
            reply.host.id,
 
419
            reply.wreq.host_info.reliable?"":" not",
 
420
            reply.host.os_name, reply.host.error_rate,
 
421
            reply.host.avg_turnaround/3600
300
422
        );
301
423
    }
302
 
        return 0;
 
424
    return 0;
303
425
}
304
426
 
305
 
// Check to see if the user has set application preferences.
306
 
// If they have, then only send work for the allowed applications
 
427
// Return true if the user has set application preferences,
 
428
// and this job is not for a selected app
307
429
//
308
 
static inline int check_app_filter(
309
 
    WORKUNIT& wu, SCHEDULER_REQUEST& request, SCHEDULER_REPLY& reply
 
430
bool app_not_selected(
 
431
    WORKUNIT& wu, SCHEDULER_REQUEST& , SCHEDULER_REPLY& reply
310
432
) {
311
433
    unsigned int i;
312
434
 
313
 
    if (reply.wreq.host_info.preferred_apps.size() == 0) return 0;
314
 
    bool app_allowed = false;
 
435
    if (reply.wreq.host_info.preferred_apps.size() == 0) return false;
315
436
    for (i=0; i<reply.wreq.host_info.preferred_apps.size(); i++) {
316
 
        if (wu.appid==reply.wreq.host_info.preferred_apps[i].appid) {
317
 
            app_allowed = true;
318
 
            break;
 
437
        if (wu.appid == reply.wreq.host_info.preferred_apps[i].appid) {
 
438
            reply.wreq.host_info.preferred_apps[i].work_available = true;
 
439
            return false;
319
440
        }
320
441
    }
321
 
    if (!app_allowed && !reply.wreq.beta_only) {
322
 
        reply.wreq.no_allowed_apps_available = true;
323
 
        log_messages.printf(SCHED_MSG_LOG::MSG_DEBUG,
324
 
            "[USER#%d] [WU#%d] user doesn't want work for this application\n",
325
 
            reply.user.id, wu.id
326
 
        );
327
 
        return INFEASIBLE_APP_SETTING;
328
 
    }
329
 
    return 0;
 
442
    return true;
330
443
}
331
444
 
332
 
static inline int check_memory(
333
 
    WORKUNIT& wu, SCHEDULER_REQUEST& request, SCHEDULER_REPLY& reply
 
445
// see how much RAM we can use on this machine
 
446
// TODO: compute this once, not once per job
 
447
//
 
448
static inline void get_mem_sizes(
 
449
    SCHEDULER_REQUEST& request, SCHEDULER_REPLY& reply,
 
450
    double& ram, double& usable_ram
334
451
) {
335
 
    // see how much RAM we can use on this machine
336
 
    //
337
 
    double ram = reply.host.m_nbytes;
 
452
    ram = reply.host.m_nbytes;
338
453
    if (ram <= 0) ram = DEFAULT_RAM_SIZE;
339
 
    double usable_ram = ram;
 
454
    usable_ram = ram;
340
455
    double busy_frac = request.global_prefs.ram_max_used_busy_frac;
341
456
    double idle_frac = request.global_prefs.ram_max_used_idle_frac;
342
457
    double frac = 1;
345
460
        if (frac > 1) frac = 1;
346
461
        usable_ram *= frac;
347
462
    }
348
 
 
349
 
    if (wu.rsc_memory_bound > usable_ram) {
350
 
        log_messages.printf(
351
 
            SCHED_MSG_LOG::MSG_DEBUG,
352
 
            "[WU#%d %s] needs %0.2fMB RAM; [HOST#%d] has %0.2fMB, %0.2fMB usable\n",
353
 
            wu.id, wu.name, wu.rsc_memory_bound/MEGA,
354
 
            reply.host.id, ram/MEGA, usable_ram/MEGA
 
463
}
 
464
 
 
465
static inline int check_memory(
 
466
    WORKUNIT& wu, SCHEDULER_REQUEST& request, SCHEDULER_REPLY& reply
 
467
) {
 
468
    double ram, usable_ram;
 
469
    get_mem_sizes(request, reply, ram, usable_ram);
 
470
 
 
471
    double diff = wu.rsc_memory_bound - usable_ram;
 
472
    if (diff > 0) {
 
473
        char message[256];
 
474
        sprintf(message,
 
475
            "%s needs %0.2f MB RAM but only %0.2f MB is available for use.",
 
476
            find_user_friendly_name(wu.appid),
 
477
            wu.rsc_memory_bound/MEGA, usable_ram/MEGA
355
478
        );
356
 
        // only add message once
357
 
        //
358
 
        if (!reply.wreq.insufficient_mem) {
359
 
            char explanation[256];
360
 
            if (wu.rsc_memory_bound > ram) {
361
 
                sprintf(explanation,
362
 
                    "Your computer has %0.2fMB of memory, and a job requires %0.2fMB",
363
 
                    ram/MEGA, wu.rsc_memory_bound/MEGA
364
 
                );
365
 
            } else {
366
 
                sprintf(explanation,
367
 
                    "Your preferences limit memory usage to %0.2fMB, and a job requires %0.2fMB",
368
 
                    usable_ram/MEGA, wu.rsc_memory_bound/MEGA
369
 
                );
370
 
            }
371
 
            USER_MESSAGE um(explanation, "high");
372
 
            reply.insert_message(um);
 
479
        USER_MESSAGE um(message,"high");
 
480
        reply.wreq.insert_no_work_message(um);
 
481
        
 
482
        if (config.debug_send) {
 
483
            log_messages.printf(MSG_DEBUG,
 
484
                "[WU#%d %s] needs %0.2fMB RAM; [HOST#%d] has %0.2fMB, %0.2fMB usable\n",
 
485
                wu.id, wu.name, wu.rsc_memory_bound/MEGA,
 
486
                reply.host.id, ram/MEGA, usable_ram/MEGA
 
487
            );
373
488
        }
374
 
        reply.wreq.insufficient_mem = true;
 
489
        reply.wreq.mem.set_insufficient(wu.rsc_memory_bound);
375
490
        reply.set_delay(DELAY_NO_WORK_TEMP);
376
491
        return INFEASIBLE_MEM;
377
492
    }
379
494
}
380
495
 
381
496
static inline int check_disk(
382
 
    WORKUNIT& wu, SCHEDULER_REQUEST& request, SCHEDULER_REPLY& reply
 
497
    WORKUNIT& wu, SCHEDULER_REQUEST& , SCHEDULER_REPLY& reply
383
498
) {
384
 
    if (wu.rsc_disk_bound > reply.wreq.disk_available) {
385
 
        reply.wreq.insufficient_disk = true;
 
499
    double diff = wu.rsc_disk_bound - reply.wreq.disk_available;
 
500
    if (diff > 0) {
 
501
        char message[256];
 
502
        sprintf(message,
 
503
            "%s needs %0.2fMB more disk space.  You currently have %0.2f MB available and it needs %0.2f MB.",
 
504
            find_user_friendly_name(wu.appid),
 
505
            diff/MEGA, reply.wreq.disk_available/MEGA, wu.rsc_disk_bound/MEGA
 
506
        );
 
507
        USER_MESSAGE um(message,"high");
 
508
        reply.wreq.insert_no_work_message(um);
 
509
 
 
510
        reply.wreq.disk.set_insufficient(diff);
386
511
        return INFEASIBLE_DISK;
387
512
    }
388
513
    return 0;
389
514
}
390
515
 
 
516
static inline int check_bandwidth(
 
517
    WORKUNIT& wu, SCHEDULER_REQUEST& , SCHEDULER_REPLY& reply
 
518
) {
 
519
    if (wu.rsc_bandwidth_bound == 0) return 0;
 
520
    double diff = wu.rsc_bandwidth_bound - reply.host.n_bwdown;
 
521
    if (diff > 0) {
 
522
        char message[256];
 
523
        sprintf(message,
 
524
            "%s requires %0.2f kbps download bandwidth.  Your computer has been measured at %0.2f kbps.",
 
525
            find_user_friendly_name(wu.appid),
 
526
            wu.rsc_bandwidth_bound/KILO, reply.host.n_bwdown/KILO
 
527
        );
 
528
        USER_MESSAGE um(message,"high");
 
529
        reply.wreq.insert_no_work_message(um);
 
530
 
 
531
        reply.wreq.bandwidth.set_insufficient(diff);
 
532
        return INFEASIBLE_BANDWIDTH;
 
533
    }
 
534
    return 0;
 
535
}
 
536
 
391
537
static inline int check_deadline(
392
538
    WORKUNIT& wu, SCHEDULER_REQUEST& request, SCHEDULER_REPLY& reply
393
539
) {
398
544
        double ewd = estimate_wallclock_duration(wu, request, reply);
399
545
        double est_completion_delay = request.estimated_delay + ewd;
400
546
        double est_report_delay = max(est_completion_delay, request.global_prefs.work_buf_min());
401
 
        if (est_report_delay> wu.delay_bound) {
402
 
            log_messages.printf(
403
 
                SCHED_MSG_LOG::MSG_DEBUG,
404
 
                "[WU#%d %s] needs %d seconds on [HOST#%d]; delay_bound is %d (request.estimated_delay is %f)\n",
405
 
                wu.id, wu.name, (int)ewd, reply.host.id, wu.delay_bound, request.estimated_delay
406
 
            );
407
 
            reply.wreq.insufficient_speed = true;
 
547
        double diff = est_report_delay - wu.delay_bound;
 
548
        if (diff > 0) {
 
549
            if (config.debug_send) {
 
550
                log_messages.printf(MSG_DEBUG,
 
551
                    "[WU#%d %s] est report delay %d on [HOST#%d]; delay_bound is %d\n",
 
552
                    wu.id, wu.name, (int)est_report_delay,
 
553
                    reply.host.id, wu.delay_bound
 
554
                );
 
555
            }
 
556
            reply.wreq.speed.set_insufficient(diff);
408
557
            return INFEASIBLE_CPU;
409
558
        }
410
559
    }
411
560
    return 0;
412
561
}
413
562
 
414
 
// Quick checks (no DB access) to see if the WU can be sent on the host.
 
563
// Fast checks (no DB access) to see if the job can be sent to the host.
415
564
// Reasons why not include:
416
565
// 1) the host doesn't have enough memory;
417
566
// 2) the host doesn't have enough disk space;
419
568
//    the host probably won't get the result done within the delay bound
420
569
// 4) app isn't in user's "approved apps" list
421
570
//
422
 
// TODO: this should be used in locality scheduling case too.
423
 
// Should move a few other checks from sched_array.C
424
 
//
425
 
int wu_is_infeasible(
426
 
    WORKUNIT& wu, SCHEDULER_REQUEST& request, SCHEDULER_REPLY& reply,
427
 
    APP* app
 
571
int wu_is_infeasible_fast(
 
572
    WORKUNIT& wu, SCHEDULER_REQUEST& request, SCHEDULER_REPLY& reply, APP& app
428
573
) {
429
574
    int retval;
430
575
 
431
576
    // homogeneous redundancy, quick check
432
577
    //
433
 
    if (config.homogeneous_redundancy || app->homogeneous_redundancy) {
434
 
        if (already_sent_to_different_platform_quick(request, wu)) {
435
 
            log_messages.printf(
436
 
                SCHED_MSG_LOG::MSG_DEBUG,
437
 
                "[HOST#%d] [WU#%d %s] failed quick HR check: WU is class %d, host is class %d\n",
438
 
                reply.host.id, wu.id, wu.name, wu.hr_class, hr_class(request.host)
439
 
            );
 
578
    if (app_hr_type(app)) {
 
579
        if (hr_unknown_platform_type(reply.host, app_hr_type(app))) {
 
580
            if (config.debug_send) {
 
581
                log_messages.printf(MSG_DEBUG,
 
582
                    "[HOST#%d] [WU#%d %s] host is of unknown class in HR type %d\n",
 
583
                    reply.host.id, wu.id, app_hr_type(app)
 
584
                );
 
585
            }
 
586
            return INFEASIBLE_HR;
 
587
        }
 
588
        if (already_sent_to_different_platform_quick(request, wu, app)) {
 
589
            if (config.debug_send) {
 
590
                log_messages.printf(MSG_DEBUG,
 
591
                    "[HOST#%d] [WU#%d %s] failed quick HR check: WU is class %d, host is class %d\n",
 
592
                    reply.host.id, wu.id, wu.name, wu.hr_class, hr_class(request.host, app_hr_type(app))
 
593
                );
 
594
            }
440
595
            return INFEASIBLE_HR;
441
596
        }
442
597
    }
447
602
        }
448
603
    }
449
604
 
450
 
    retval = check_app_filter(wu, request, reply);
451
 
    if (retval) return retval;
452
605
    retval = check_memory(wu, request, reply);
453
606
    if (retval) return retval;
454
607
    retval = check_disk(wu, request, reply);
455
608
    if (retval) return retval;
 
609
    retval = check_bandwidth(wu, request, reply);
 
610
    if (retval) return retval;
456
611
 
457
612
    // do this last because EDF sim uses some CPU
458
613
    //
460
615
        double est_cpu = estimate_cpu_duration(wu, reply);
461
616
        IP_RESULT candidate("", wu.delay_bound, est_cpu);
462
617
        strcpy(candidate.name, wu.name);
463
 
        if (check_candidate(candidate, reply.host.p_ncpus, request.ip_results)) {
 
618
        if (check_candidate(candidate, effective_ncpus(reply.host), request.ip_results)) {
464
619
            // it passed the feasibility test,
465
620
            // but don't add it the the workload yet;
466
621
            // wait until we commit to sending it
467
622
        } else {
468
 
            reply.wreq.insufficient_speed = true;
 
623
            reply.wreq.speed.set_insufficient(0);
469
624
            return INFEASIBLE_WORKLOAD;
470
625
        }
471
626
    } else {
480
635
//
481
636
int insert_after(char* buffer, const char* after, const char* text) {
482
637
    char* p;
483
 
    char temp[LARGE_BLOB_SIZE];
 
638
    char temp[BLOB_SIZE];
484
639
 
485
 
    if (strlen(buffer) + strlen(text) > LARGE_BLOB_SIZE-1) {
486
 
        log_messages.printf(SCHED_MSG_LOG::MSG_CRITICAL,
 
640
    if (strlen(buffer) + strlen(text) > BLOB_SIZE-1) {
 
641
        log_messages.printf(MSG_CRITICAL,
487
642
            "insert_after: overflow: %d %d\n",
488
643
            strlen(buffer), strlen(text)
489
644
        );
491
646
    }
492
647
    p = strstr(buffer, after);
493
648
    if (!p) {
494
 
        log_messages.printf(SCHED_MSG_LOG::MSG_CRITICAL,
 
649
        log_messages.printf(MSG_CRITICAL,
495
650
            "insert_after: %s not found in %s\n", after, buffer
496
651
        );
497
652
        return ERR_NULL;
507
662
// in preparation for sending it to a client
508
663
//
509
664
int insert_wu_tags(WORKUNIT& wu, APP& app) {
510
 
    char buf[LARGE_BLOB_SIZE];
 
665
    char buf[BLOB_SIZE];
511
666
    
512
667
    sprintf(buf,
513
668
        "    <rsc_fpops_est>%f</rsc_fpops_est>\n"
526
681
    return insert_after(wu.xml_doc, "<workunit>\n", buf);
527
682
}
528
683
 
529
 
// return the APP and APP_VERSION for the given WU, for the given platform.
530
 
// return false if none
531
 
//
532
 
bool find_app_version(
533
 
    WORK_REQ& wreq, WORKUNIT& wu, PLATFORM_LIST& platforms, SCHED_SHMEM& ss,
534
 
    APP*& app, APP_VERSION*& avp
535
 
) {
536
 
    app = ss.lookup_app(wu.appid);
537
 
    if (!app) {
538
 
        log_messages.printf(
539
 
            SCHED_MSG_LOG::MSG_CRITICAL, "Can't find APP#%d\n", wu.appid
540
 
        );
541
 
        return false;
542
 
    }
543
 
    unsigned int i;
544
 
    for (i=0; i<platforms.list.size(); i++) {
545
 
        PLATFORM* p = platforms.list[i];
546
 
        avp = ss.lookup_app_version(app->id, p->id, app->min_version);
547
 
        if (avp) return true;
548
 
    }
549
 
    log_messages.printf(
550
 
        SCHED_MSG_LOG::MSG_DEBUG,
551
 
        "no app version available: APP#%d PLATFORM#%d min_version %d\n",
552
 
        app->id, platforms.list[0]->id, app->min_version
553
 
    );
554
 
    wreq.no_app_version = true;
555
 
    return false;
556
 
}
557
 
 
558
684
// verify that the given APP_VERSION will work with the core client
559
685
//
560
686
bool app_core_compatible(WORK_REQ& wreq, APP_VERSION& av) {
561
687
    if (wreq.core_client_version < av.min_core_version) {
562
 
#if 0
563
 
        log_messages.printf(
564
 
            SCHED_MSG_LOG::MSG_DEBUG,
565
 
            "Outdated core version: wanted %d, got %d\n",
566
 
            av.min_core_version, wreq.core_client_version
567
 
        );
568
 
#endif
569
688
        wreq.outdated_core = true;
570
689
        return false;
571
690
    }
573
692
}
574
693
 
575
694
// add the given workunit to a reply.
576
 
// look up its app, and make sure there's a version for this platform.
577
695
// Add the app and app_version to the reply also.
578
696
//
579
697
int add_wu_to_reply(
580
 
    WORKUNIT& wu, SCHEDULER_REPLY& reply, PLATFORM_LIST& platforms,
581
 
    APP* app, APP_VERSION* avp
 
698
    WORKUNIT& wu, SCHEDULER_REPLY& reply, APP* app, BEST_APP_VERSION* bavp
582
699
) {
583
700
    int retval;
584
701
    WORKUNIT wu2, wu3;
585
702
    
 
703
    APP_VERSION* avp = bavp->avp;
 
704
    if (avp == (APP_VERSION*)1) avp = NULL;
 
705
 
586
706
    // add the app, app_version, and workunit to the reply,
587
707
    // but only if they aren't already there
588
708
    //
594
714
        }
595
715
        
596
716
        reply.insert_app_unique(*app);
 
717
        av2.bavp = bavp;
597
718
        reply.insert_app_version_unique(*avp2);
598
 
        log_messages.printf(
599
 
            SCHED_MSG_LOG::MSG_DEBUG,
600
 
            "[HOST#%d] Sending app_version %s %d %d\n",
601
 
            reply.host.id, app->name, avp2->platformid, avp2->version_num
602
 
        );
 
719
        if (config.debug_send) {
 
720
            log_messages.printf(MSG_DEBUG,
 
721
                "[HOST#%d] Sending app_version %s %d %d\n",
 
722
                reply.host.id, app->name, avp2->platformid, avp2->version_num
 
723
            );
 
724
        }
603
725
    }
604
726
 
605
727
    // add time estimate to reply
607
729
    wu2 = wu;       // make copy since we're going to modify its XML field
608
730
    retval = insert_wu_tags(wu2, *app);
609
731
    if (retval) {
610
 
        log_messages.printf(SCHED_MSG_LOG::MSG_NORMAL,
611
 
            "insert_wu_tags failed %d\n", retval
612
 
        );
 
732
        log_messages.printf(MSG_NORMAL, "insert_wu_tags failed %d\n", retval);
613
733
        return retval;
614
734
    }
615
735
    wu3=wu2;
690
810
        // if we've failed to send a result because of a transient condition,
691
811
        // return false to preserve invariant
692
812
        //
693
 
        if (wreq.insufficient_disk || wreq.insufficient_speed || wreq.insufficient_mem || wreq.no_allowed_apps_available) {
 
813
        if (wreq.disk.insufficient || wreq.speed.insufficient || wreq.mem.insufficient || wreq.no_allowed_apps_available) {
694
814
            return false;
695
815
        }
696
816
    }
697
817
    if (wreq.seconds_to_fill <= 0) return false;
698
818
    if (wreq.disk_available <= 0) {
699
 
        wreq.insufficient_disk = true;
700
819
        return false;
701
820
    }
702
821
    if (wreq.nresults >= config.max_wus_to_send) return false;
703
822
 
704
 
    // config.daily_result_quota is PER CPU (up to max of MAX_CPUS CPUs)
 
823
    int ncpus = effective_ncpus(host);
 
824
 
705
825
    // host.max_results_day is between 1 and config.daily_result_quota inclusive
706
 
    // wreq.daily_result_quota is between ncpus and ncpus*host.max_results_day inclusive
 
826
    // wreq.daily_result_quota is between ncpus
 
827
    // and ncpus*host.max_results_day inclusive
 
828
    //
707
829
    if (config.daily_result_quota) {
708
 
        if (host.max_results_day <= 0 || host.max_results_day>config.daily_result_quota) {
 
830
        if (host.max_results_day == 0 || host.max_results_day>config.daily_result_quota) {
709
831
            host.max_results_day = config.daily_result_quota;
710
832
        }
711
 
        // scale daily quota by #CPUs, up to a limit of MAX_CPUS 4
712
 
        //
713
 
        int ncpus = host.p_ncpus;
714
 
        if (ncpus > MAX_CPUS) ncpus = MAX_CPUS;
715
 
        if (ncpus < 1) ncpus = 1;
716
833
        wreq.daily_result_quota = ncpus*host.max_results_day;
717
834
        if (host.nresults_today >= wreq.daily_result_quota) {
718
835
            wreq.daily_result_quota_exceeded = true;
721
838
    }
722
839
 
723
840
    if (config.max_wus_in_progress) {
724
 
        int limit = config.max_wus_in_progress;
725
 
        if (wreq.nresults_on_host >= limit) {
726
 
            log_messages.printf(
727
 
                SCHED_MSG_LOG::MSG_DEBUG,
728
 
                "cache limit exceeded; %d > %d\n",
729
 
                wreq.nresults_on_host, config.max_wus_in_progress
730
 
            );
731
 
            wreq.cache_size_exceeded=true;
 
841
        if (wreq.nresults_on_host >= config.max_wus_in_progress*ncpus) {
 
842
            if (config.debug_send) {
 
843
                log_messages.printf(MSG_DEBUG,
 
844
                    "in-progress job limit exceeded; %d > %d*%d\n",
 
845
                    wreq.nresults_on_host, config.max_wus_in_progress, ncpus
 
846
                );
 
847
            }
 
848
            wreq.cache_size_exceeded = true;
732
849
            return false;
733
850
        }
734
851
    }
751
868
 
752
869
int add_result_to_reply(
753
870
    DB_RESULT& result, WORKUNIT& wu, SCHEDULER_REQUEST& request,
754
 
    SCHEDULER_REPLY& reply, PLATFORM_LIST& platforms,
755
 
    APP* app, APP_VERSION* avp
 
871
    SCHEDULER_REPLY& reply, BEST_APP_VERSION* bavp
756
872
) {
757
873
    int retval;
758
874
    double wu_seconds_filled;
759
875
    bool resent_result = false;
 
876
    APP* app = ssp->lookup_app(wu.appid);
760
877
 
761
 
    retval = add_wu_to_reply(wu, reply, platforms, app, avp);
 
878
    retval = add_wu_to_reply(wu, reply, app, bavp);
762
879
    if (retval) return retval;
763
880
 
764
881
    // in the scheduling locality case,
780
897
 
781
898
    int delay_bound = wu.delay_bound;
782
899
    if (result.server_state != RESULT_SERVER_STATE_IN_PROGRESS) {
783
 
        // If the workunit needs reliable and is being sent to a reliable host,
784
 
        // then shorten the delay bound by the percent specified
785
 
        //
786
 
        if (config.reliable_time && reply.wreq.host_info.reliable && config.reliable_reduced_delay_bound > 0.01) {
787
 
                if ((wu.create_time + config.reliable_time) <= time(0)) {
788
 
                delay_bound = (int) (delay_bound * config.reliable_reduced_delay_bound);
789
 
                }
790
 
        }
791
 
        
792
900
        // We are sending this result for the first time
793
901
        //
 
902
        // If the workunit needs reliable and is being sent to a reliable host,
 
903
        // then shorten the delay bound by the percent specified
 
904
        //
 
905
        if (config.reliable_on_priority && result.priority >= config.reliable_on_priority && config.reliable_reduced_delay_bound > 0.01
 
906
        ) {
 
907
                        double reduced_delay_bound = delay_bound*config.reliable_reduced_delay_bound;
 
908
                        double est_wallclock_duration = estimate_wallclock_duration(wu, request, reply);
 
909
            // Check to see how reasonable this reduced time is.
 
910
            // Increase it to twice the estimated delay bound
 
911
            // if all the following apply:
 
912
            //
 
913
                        // 1) Twice the estimate is longer then the reduced delay bound
 
914
                        // 2) Twice the estimate is less then the original delay bound
 
915
                        // 3) Twice the estimate is less then the twice the reduced delay bound
 
916
                        if (est_wallclock_duration*2 > reduced_delay_bound && est_wallclock_duration*2 < delay_bound && est_wallclock_duration*2 < delay_bound*config.reliable_reduced_delay_bound*2 ) {
 
917
                        reduced_delay_bound = est_wallclock_duration*2;
 
918
            }
 
919
                        delay_bound = (int) reduced_delay_bound;
 
920
        }
 
921
 
794
922
        result.report_deadline = result.sent_time + delay_bound;
795
923
        result.server_state = RESULT_SERVER_STATE_IN_PROGRESS;
796
924
    } else {
797
 
        // Result was ALREADY sent to this host but never arrived.
798
 
        // So we are resending it.
799
 
        // result.report_deadline and time_sent
800
 
        // have already been updated before this function was called.
 
925
        // Result was already sent to this host but was lost,
 
926
        // so we are resending it.
801
927
        //
802
928
        resent_result = true;
803
929
 
 
930
        // TODO: explain the following
 
931
        //
804
932
        if (result.report_deadline < result.sent_time) {
805
933
            result.report_deadline = result.sent_time + 10;
806
934
        }
808
936
            result.report_deadline = result.sent_time + delay_bound;
809
937
        }
810
938
 
811
 
        log_messages.printf(
812
 
            SCHED_MSG_LOG::MSG_DEBUG,
813
 
            "[RESULT#%d] [HOST#%d] (resend lost work)\n",
814
 
            result.id, reply.host.id
815
 
        );
 
939
        if (config.debug_send) {
 
940
            log_messages.printf(MSG_DEBUG,
 
941
                "[RESULT#%d] [HOST#%d] (resend lost work)\n",
 
942
                result.id, reply.host.id
 
943
            );
 
944
        }
816
945
    }
817
946
    retval = result.mark_as_sent(old_server_state);
818
947
    if (retval==ERR_DB_NOT_FOUND) {
819
 
        log_messages.printf(
820
 
            SCHED_MSG_LOG::MSG_CRITICAL,
 
948
        log_messages.printf(MSG_CRITICAL,
821
949
            "[RESULT#%d] [HOST#%d]: CAN'T SEND, already sent to another host\n",
822
950
            result.id, reply.host.id
823
951
        );
824
952
    } else if (retval) {
825
 
        log_messages.printf(
826
 
            SCHED_MSG_LOG::MSG_CRITICAL,
 
953
        log_messages.printf(MSG_CRITICAL,
827
954
            "add_result_to_reply: can't update result: %d\n", retval
828
955
        );
829
956
    }
830
957
    if (retval) return retval;
831
958
 
832
959
    wu_seconds_filled = estimate_wallclock_duration(wu, request, reply);
833
 
    log_messages.printf(
834
 
        SCHED_MSG_LOG::MSG_NORMAL,
835
 
        "[HOST#%d] Sending [RESULT#%d %s] (fills %.2f seconds)\n",
836
 
        reply.host.id, result.id, result.name, wu_seconds_filled
837
 
    );
 
960
    if (config.debug_send) {
 
961
        log_messages.printf(MSG_NORMAL,
 
962
            "[HOST#%d] Sending [RESULT#%d %s] (fills %.2f seconds)\n",
 
963
            reply.host.id, result.id, result.name, wu_seconds_filled
 
964
        );
 
965
    }
838
966
 
839
967
    retval = update_wu_transition_time(wu, result.report_deadline);
840
968
    if (retval) {
841
 
        log_messages.printf(
842
 
            SCHED_MSG_LOG::MSG_CRITICAL,
 
969
        log_messages.printf(MSG_CRITICAL,
843
970
            "add_result_to_reply: can't update WU transition time: %d\n",
844
971
            retval
845
972
        );
851
978
    //
852
979
    retval = insert_name_tags(result, wu);
853
980
    if (retval) {
854
 
        log_messages.printf(
855
 
            SCHED_MSG_LOG::MSG_CRITICAL,
 
981
        log_messages.printf(MSG_CRITICAL,
856
982
            "add_result_to_reply: can't insert name tags: %d\n",
857
983
            retval
858
984
        );
860
986
    }
861
987
    retval = insert_deadline_tag(result);
862
988
    if (retval) {
863
 
        log_messages.printf(
864
 
            SCHED_MSG_LOG::MSG_CRITICAL,
 
989
        log_messages.printf(MSG_CRITICAL,
865
990
            "add_result_to_reply: can't insert deadline tag: %d\n", retval
866
991
        );
867
992
        return retval;
868
993
    }
869
 
    if (avp) {
870
 
        PLATFORM* pp = ssp->lookup_platform_id(avp->platformid);
871
 
        strcpy(result.platform_name, pp->name);
872
 
        result.version_num = avp->version_num;
873
 
    }
 
994
    result.bavp = bavp;
874
995
    reply.insert_result(result);
875
996
    reply.wreq.seconds_to_fill -= wu_seconds_filled;
876
 
    request.estimated_delay += wu_seconds_filled/reply.host.p_ncpus;
 
997
    request.estimated_delay += wu_seconds_filled/effective_ncpus(reply.host);
877
998
    reply.wreq.nresults++;
878
999
    reply.wreq.nresults_on_host++;
879
1000
    if (!resent_result) reply.host.nresults_today++;
885
1006
        IP_RESULT ipr ("", time(0)+wu.delay_bound, est_cpu);
886
1007
        request.ip_results.push_back(ipr);
887
1008
    }
 
1009
 
 
1010
    // mark job as done if debugging flag is set
 
1011
    //
 
1012
    if (mark_jobs_done) {
 
1013
        DB_WORKUNIT dbwu;
 
1014
        char buf[256];
 
1015
        sprintf(buf,
 
1016
            "server_state=%d outcome=%d",
 
1017
            RESULT_SERVER_STATE_OVER, RESULT_OUTCOME_SUCCESS
 
1018
        );
 
1019
        result.update_field(buf);
 
1020
 
 
1021
        dbwu.id = wu.id;
 
1022
        sprintf(buf, "transition_time=%d", time(0));
 
1023
        dbwu.update_field(buf);
 
1024
 
 
1025
    }
888
1026
    return 0;
889
1027
}
890
1028
 
891
 
int send_work(
892
 
    SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, PLATFORM_LIST& platforms,
893
 
    SCHED_SHMEM& ss
894
 
) {
895
 
    reply.wreq.disk_available = max_allowable_disk(sreq, reply);
896
 
    reply.wreq.insufficient_disk = false;
897
 
    reply.wreq.insufficient_mem = false;
898
 
    reply.wreq.insufficient_speed = false;
899
 
    reply.wreq.excessive_work_buf = false;
900
 
    reply.wreq.no_app_version = false;
901
 
    reply.wreq.hr_reject_temp = false;
902
 
    reply.wreq.hr_reject_perm = false;
903
 
    reply.wreq.daily_result_quota_exceeded = false;
904
 
    reply.wreq.core_client_version = sreq.core_client_major_version*100
905
 
        + sreq.core_client_minor_version;
906
 
    reply.wreq.nresults = 0;
907
 
    get_host_info(reply); // parse project prefs for app details
908
 
    reply.wreq.beta_only = false;
909
 
 
910
 
    log_messages.printf(
911
 
        SCHED_MSG_LOG::MSG_NORMAL,
912
 
        "[HOST#%d] got request for %f seconds of work; available disk %f GB\n",
913
 
        reply.host.id, sreq.work_req_seconds, reply.wreq.disk_available/1e9
914
 
    );
915
 
 
916
 
    if (sreq.work_req_seconds <= 0) return 0;
917
 
 
918
 
    reply.wreq.seconds_to_fill = sreq.work_req_seconds;
919
 
    if (reply.wreq.seconds_to_fill > MAX_SECONDS_TO_SEND) {
920
 
        reply.wreq.seconds_to_fill = MAX_SECONDS_TO_SEND;
921
 
    }
922
 
    if (reply.wreq.seconds_to_fill < MIN_SECONDS_TO_SEND) {
923
 
        reply.wreq.seconds_to_fill = MIN_SECONDS_TO_SEND;
924
 
    }
925
 
 
926
 
    if (config.workload_sim && sreq.have_other_results_list) {
927
 
        init_ip_results(
928
 
            sreq.global_prefs.work_buf_min(), reply.host.p_ncpus, sreq.ip_results
929
 
        );
930
 
    }
931
 
 
932
 
    if (config.locality_scheduling) {
933
 
        reply.wreq.infeasible_only = false;
934
 
        send_work_locality(sreq, reply, platforms, ss);
935
 
    } else {
936
 
        // give top priority to results that require a 'reliable host'
937
 
        //
938
 
        if (reply.wreq.host_info.reliable) {
939
 
                reply.wreq.reliable_only = true;
940
 
                reply.wreq.infeasible_only = false;
941
 
                scan_work_array(sreq, reply, platforms, ss);
942
 
        }
943
 
        reply.wreq.reliable_only = false;
944
 
 
945
 
        // give 2nd priority to results for a beta app
946
 
        // (projects should load beta work with care,
947
 
        // otherwise your users won't get production work done!
948
 
        //
949
 
        if (reply.wreq.host_info.allow_beta_work) {
950
 
            reply.wreq.beta_only = true;
951
 
            log_messages.printf(
952
 
                SCHED_MSG_LOG::MSG_DEBUG,
953
 
                "[HOST#%d] will accept beta work.  Scanning for beta work.\n",
954
 
                reply.host.id
955
 
            );
956
 
            scan_work_array(sreq, reply, platforms, ss);
957
 
        }
958
 
        reply.wreq.beta_only = false;
959
 
        
960
 
        // give next priority to results that were infeasible for some other host
961
 
        //
962
 
        reply.wreq.infeasible_only = true;
963
 
        scan_work_array(sreq, reply, platforms, ss);
964
 
 
965
 
        reply.wreq.infeasible_only = false;
966
 
        scan_work_array(sreq, reply, platforms, ss);
967
 
    }
968
 
 
969
 
    log_messages.printf(
970
 
        SCHED_MSG_LOG::MSG_NORMAL,
971
 
        "[HOST#%d] Sent %d results [scheduler ran %f seconds]\n",
972
 
        reply.host.id, reply.wreq.nresults, elapsed_wallclock_time() 
973
 
    );
974
 
 
 
1029
// send messages to user about why jobs were or weren't sent
 
1030
//
 
1031
static void explain_to_user(SCHEDULER_REPLY& reply) {
 
1032
    char helpful[512];
 
1033
    unsigned int i;
 
1034
 
 
1035
    // If work was sent from apps // the user did not select, explain
 
1036
    //
 
1037
    if (reply.wreq.nresults && !reply.wreq.user_apps_only) {
 
1038
        USER_MESSAGE um("No work can be sent for the applications you have selected", "high");
 
1039
        reply.insert_message(um);
 
1040
 
 
1041
        // Inform the user about applications with no work
 
1042
        for (i=0; i<reply.wreq.host_info.preferred_apps.size(); i++) {
 
1043
                if (!reply.wreq.host_info.preferred_apps[i].work_available) {
 
1044
                        APP* app = ssp->lookup_app(reply.wreq.host_info.preferred_apps[i].appid);
 
1045
                        // don't write message if the app is deprecated
 
1046
                        if (app) {
 
1047
                                char explanation[256];
 
1048
                                sprintf(explanation,
 
1049
                        "No work is available for %s",
 
1050
                        find_user_friendly_name(reply.wreq.host_info.preferred_apps[i].appid)
 
1051
                    );
 
1052
                                USER_MESSAGE um(explanation, "high");
 
1053
                                reply.insert_message(um);
 
1054
                        }
 
1055
                }
 
1056
        }
 
1057
 
 
1058
        // Tell the user about applications they didn't qualify for
 
1059
        //
 
1060
        for(i=0;i<preferred_app_message_index;i++){
 
1061
            reply.insert_message(reply.wreq.no_work_messages.at(i));
 
1062
        }
 
1063
        USER_MESSAGE um1("You have selected to receive work from other applications if no work is available for the applications you selected", "high");
 
1064
        reply.insert_message(um1);
 
1065
        USER_MESSAGE um2("Sending work from other applications", "high");
 
1066
        reply.insert_message(um2);
 
1067
     }
 
1068
 
 
1069
    // if client asked for work and we're not sending any, explain why
 
1070
    //
975
1071
    if (reply.wreq.nresults == 0) {
976
1072
        reply.set_delay(DELAY_NO_WORK_TEMP);
977
1073
        USER_MESSAGE um2("No work sent", "high");
978
1074
        reply.insert_message(um2);
 
1075
        // Inform the user about applications with no work
 
1076
        for(i=0; i<reply.wreq.host_info.preferred_apps.size(); i++) {
 
1077
                if (!reply.wreq.host_info.preferred_apps[i].work_available) {
 
1078
                        APP* app = ssp->lookup_app(reply.wreq.host_info.preferred_apps[i].appid);
 
1079
                        // don't write message if the app is deprecated
 
1080
                        if ( app != NULL ) {
 
1081
                                char explanation[256];
 
1082
                                sprintf(explanation,"No work is available for %s",find_user_friendly_name(reply.wreq.host_info.preferred_apps[i].appid));
 
1083
                                USER_MESSAGE um(explanation, "high");
 
1084
                                reply.insert_message(um);
 
1085
                        }
 
1086
                }
 
1087
        }
 
1088
        // Inform the user about applications they didn't qualify for
 
1089
        for(int i=0;i<reply.wreq.no_work_messages.size();i++){
 
1090
                reply.insert_message(reply.wreq.no_work_messages.at(i));
 
1091
        }
979
1092
        if (reply.wreq.no_app_version) {
980
 
            USER_MESSAGE um("(there was work for other platforms)", "high");
981
 
            reply.insert_message(um);
982
1093
            reply.set_delay(DELAY_NO_WORK_PERM);
983
1094
        }
984
1095
        if (reply.wreq.no_allowed_apps_available) {
985
1096
            USER_MESSAGE um(
986
 
                "(There was work but not for the applications you have allowed.  Please check your settings on the website.)",
987
 
                "high"
988
 
            );
989
 
            reply.insert_message(um);
990
 
        }
991
 
        if (reply.wreq.insufficient_disk) {
992
 
            USER_MESSAGE um(
993
 
                "(there was work but you don't have enough disk space allocated)",
994
 
                "high"
995
 
            );
996
 
            reply.insert_message(um);
997
 
        }
998
 
        if (reply.wreq.insufficient_mem) {
999
 
            USER_MESSAGE um(
1000
 
                "(there was work but your computer doesn't have enough memory)",
1001
 
                "high"
1002
 
            );
1003
 
            reply.insert_message(um);
1004
 
        }
1005
 
        if (reply.wreq.insufficient_speed) {
1006
 
            char helpful[512];
 
1097
                "No work available for the applications you have selected.  Please check your settings on the web site.",
 
1098
                "high"
 
1099
            );
 
1100
            reply.insert_message(um);
 
1101
        }
 
1102
        if (reply.wreq.speed.insufficient) {
1007
1103
            if (reply.wreq.core_client_version>419) {
1008
1104
                sprintf(helpful,
1009
1105
                    "(won't finish in time) "
1010
 
                    "Computer on %.1f%% of time, BOINC on %.1f%% of that, this project gets %.1f%% of that",
1011
 
                    100.0*reply.host.on_frac, 100.0*reply.host.active_frac, 100.0*sreq.resource_share_fraction
 
1106
                    "Computer on %.1f%% of time, BOINC on %.1f%% of that",
 
1107
                    100.0*reply.host.on_frac, 100.0*reply.host.active_frac
1012
1108
                );
1013
 
            }
1014
 
            else {
 
1109
            } else {
1015
1110
                sprintf(helpful,
1016
1111
                    "(won't finish in time) "
1017
 
                    "Computer available %.1f%% of time, this project gets %.1f%% of that",
1018
 
                    100.0*reply.host.on_frac, 100.0*sreq.resource_share_fraction
 
1112
                    "Computer available %.1f%% of time",
 
1113
                    100.0*reply.host.on_frac
1019
1114
                );
1020
1115
            }
1021
1116
            USER_MESSAGE um(helpful, "high");
1037
1132
        }
1038
1133
        if (reply.wreq.outdated_core) {
1039
1134
            USER_MESSAGE um(
1040
 
                " (your core client is out of date - please upgrade)",
 
1135
                " (your BOINC client is old - please install current version)",
1041
1136
                "high"
1042
1137
            );
1043
1138
            reply.insert_message(um);
1044
1139
            reply.set_delay(DELAY_NO_WORK_PERM);
1045
 
            log_messages.printf(
1046
 
                SCHED_MSG_LOG::MSG_NORMAL,
1047
 
                "Not sending work because core client is outdated\n"
 
1140
            log_messages.printf(MSG_NORMAL,
 
1141
                "Not sending work because client is outdated\n"
1048
1142
            );
1049
1143
        }
1050
1144
        if (reply.wreq.excessive_work_buf) {
1055
1149
            reply.insert_message(um);
1056
1150
        }
1057
1151
        if (reply.wreq.daily_result_quota_exceeded) {
1058
 
            char helpful[256];
1059
1152
            struct tm *rpc_time_tm;
1060
1153
            int delay_time;
1061
1154
 
1062
1155
            sprintf(helpful, "(reached daily quota of %d results)", reply.wreq.daily_result_quota);
1063
1156
            USER_MESSAGE um(helpful, "high");
1064
1157
            reply.insert_message(um);
1065
 
            log_messages.printf(
1066
 
                SCHED_MSG_LOG::MSG_NORMAL,
 
1158
            log_messages.printf(MSG_NORMAL,
1067
1159
                "Daily result quota exceeded for host %d\n",
1068
1160
                reply.host.id
1069
1161
            );
1080
1172
            reply.set_delay(delay_time);
1081
1173
        }
1082
1174
        if (reply.wreq.cache_size_exceeded) {
1083
 
            char helpful[256];
1084
 
            sprintf(helpful, "(reached per-host limit of %d tasks)",
 
1175
            sprintf(helpful, "(reached per-CPU limit of %d tasks)",
1085
1176
                config.max_wus_in_progress
1086
1177
            );
1087
1178
            USER_MESSAGE um(helpful, "high");
1088
1179
            reply.insert_message(um);
1089
1180
            reply.set_delay(DELAY_NO_WORK_CACHE);
1090
 
            log_messages.printf(
1091
 
                SCHED_MSG_LOG::MSG_NORMAL,
1092
 
                "host %d already has %d result(s) on cache\n",
 
1181
            log_messages.printf(MSG_NORMAL,
 
1182
                "host %d already has %d result(s) in progress\n",
1093
1183
                reply.host.id, reply.wreq.nresults_on_host
1094
1184
            );
1095
1185
        }        
1096
1186
    }
1097
 
 
 
1187
}
 
1188
 
 
1189
static void get_running_frac(SCHEDULER_REPLY& reply) {
 
1190
    if (reply.wreq.core_client_version<=419) {
 
1191
        reply.wreq.running_frac = reply.host.on_frac;
 
1192
    } else {
 
1193
        reply.wreq.running_frac = reply.host.active_frac * reply.host.on_frac;
 
1194
    }
 
1195
    if (reply.wreq.running_frac < HOST_ACTIVE_FRAC_MIN) {
 
1196
        reply.wreq.running_frac = HOST_ACTIVE_FRAC_MIN;
 
1197
    }
 
1198
    if (reply.wreq.running_frac > 1) reply.wreq.running_frac = 1;
 
1199
}
 
1200
 
 
1201
void send_work(SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply) {
 
1202
    if (sreq.work_req_seconds <= 0) return;
 
1203
 
 
1204
    reply.wreq.core_client_version =
 
1205
        sreq.core_client_major_version*100 + sreq.core_client_minor_version;
 
1206
    reply.wreq.disk_available = max_allowable_disk(sreq, reply);
 
1207
    reply.wreq.core_client_version = sreq.core_client_major_version*100
 
1208
        + sreq.core_client_minor_version;
 
1209
 
 
1210
    if (hr_unknown_platform(sreq.host)) {
 
1211
        reply.wreq.hr_reject_perm = true;
 
1212
        return;
 
1213
    }
 
1214
 
 
1215
    get_host_info(reply); // parse project prefs for app details
 
1216
 
 
1217
    get_running_frac(reply);
 
1218
 
 
1219
    if (config.debug_send) {
 
1220
        log_messages.printf(MSG_DEBUG,
 
1221
            "%s matchmaking scheduling; %s EDF sim\n",
 
1222
#ifdef MATCHMAKER
 
1223
            "Using",
 
1224
#else
 
1225
            "Not using",
 
1226
#endif
 
1227
            config.workload_sim?"Using":"Not using"
 
1228
        );
 
1229
        log_messages.printf(MSG_DEBUG,
 
1230
            "available disk %f GB, work_buf_min %d\n",
 
1231
            reply.wreq.disk_available/1e9,
 
1232
            (int)sreq.global_prefs.work_buf_min()
 
1233
        );
 
1234
        log_messages.printf(MSG_DEBUG,
 
1235
            "running frac %f DCF %f CPU effic %f est delay %d\n",
 
1236
            reply.wreq.running_frac,
 
1237
            reply.host.duration_correction_factor,
 
1238
            reply.host.cpu_efficiency,
 
1239
            (int)sreq.estimated_delay
 
1240
        );
 
1241
    }
 
1242
 
 
1243
    reply.wreq.seconds_to_fill = sreq.work_req_seconds;
 
1244
    if (reply.wreq.seconds_to_fill > MAX_SECONDS_TO_SEND) {
 
1245
        reply.wreq.seconds_to_fill = MAX_SECONDS_TO_SEND;
 
1246
    }
 
1247
    if (reply.wreq.seconds_to_fill < MIN_SECONDS_TO_SEND) {
 
1248
        reply.wreq.seconds_to_fill = MIN_SECONDS_TO_SEND;
 
1249
    }
 
1250
 
 
1251
    if (config.enable_assignment) {
 
1252
        if (send_assigned_jobs(sreq, reply)) {
 
1253
            if (config.debug_assignment) {
 
1254
                log_messages.printf(MSG_DEBUG,
 
1255
                    "[HOST#%d] sent assigned jobs\n", reply.host.id
 
1256
                );
 
1257
            }
 
1258
            return;
 
1259
        }
 
1260
    }
 
1261
 
 
1262
    if (config.workload_sim && sreq.have_other_results_list) {
 
1263
        init_ip_results(
 
1264
            sreq.global_prefs.work_buf_min(), effective_ncpus(reply.host), sreq.ip_results
 
1265
        );
 
1266
    }
 
1267
 
 
1268
    if (config.locality_scheduling) {
 
1269
        reply.wreq.infeasible_only = false;
 
1270
        send_work_locality(sreq, reply);
 
1271
    } else {
 
1272
#ifdef MATCHMAKER
 
1273
        send_work_matchmaker(sreq, reply);
 
1274
#else
 
1275
        reply.wreq.beta_only = false;
 
1276
        reply.wreq.user_apps_only = true;
 
1277
 
 
1278
        // give top priority to results that require a 'reliable host'
 
1279
        //
 
1280
        if (reply.wreq.host_info.reliable) {
 
1281
            reply.wreq.reliable_only = true;
 
1282
            reply.wreq.infeasible_only = false;
 
1283
            scan_work_array(sreq, reply);
 
1284
        }
 
1285
        reply.wreq.reliable_only = false;
 
1286
 
 
1287
        // give 2nd priority to results for a beta app
 
1288
        // (projects should load beta work with care,
 
1289
        // otherwise your users won't get production work done!
 
1290
        //
 
1291
        if (reply.wreq.host_info.allow_beta_work) {
 
1292
            reply.wreq.beta_only = true;
 
1293
            if (config.debug_send) {
 
1294
                log_messages.printf(MSG_DEBUG,
 
1295
                    "[HOST#%d] will accept beta work.  Scanning for beta work.\n",
 
1296
                    reply.host.id
 
1297
                );
 
1298
            }
 
1299
            scan_work_array(sreq, reply);
 
1300
        }
 
1301
        reply.wreq.beta_only = false;
 
1302
 
 
1303
        // give next priority to results that were infeasible for some other host
 
1304
        //
 
1305
        reply.wreq.infeasible_only = true;
 
1306
        scan_work_array(sreq, reply);
 
1307
 
 
1308
        reply.wreq.infeasible_only = false;
 
1309
        scan_work_array(sreq, reply);
 
1310
        
 
1311
        // If user has selected apps but will accept any,
 
1312
        // and we haven't found any jobs for selected apps, try others
 
1313
        //
 
1314
        if (!reply.wreq.nresults && reply.wreq.host_info.allow_non_preferred_apps ) {
 
1315
                reply.wreq.user_apps_only = false;
 
1316
                preferred_app_message_index = reply.wreq.no_work_messages.size();
 
1317
            if (config.debug_send) {
 
1318
                log_messages.printf(MSG_DEBUG,
 
1319
                    "[HOST#%d] is looking for work from a non-preferred application\n",
 
1320
                    reply.host.id
 
1321
                );
 
1322
            }
 
1323
                scan_work_array(sreq, reply);
 
1324
        }
 
1325
#endif
 
1326
    }
 
1327
 
 
1328
    explain_to_user(reply);
 
1329
}
 
1330
 
 
1331
#ifdef MATCHMAKER
 
1332
 
 
1333
struct JOB{
 
1334
    int index;
 
1335
    double score;
 
1336
    double est_time;
 
1337
    double disk_usage;
 
1338
    APP* app;
 
1339
    BEST_APP_VERSION* bavp;
 
1340
 
 
1341
    void get_score(SCHEDULER_REQUEST&, SCHEDULER_REPLY&);
 
1342
};
 
1343
 
 
1344
struct JOB_SET {
 
1345
    double work_req;
 
1346
    double est_time;
 
1347
    double disk_usage;
 
1348
    double disk_limit;
 
1349
    std::list<JOB> jobs;     // sorted high to low
 
1350
 
 
1351
    void add_job(JOB&);
 
1352
    double higher_score_disk_usage(double);
 
1353
    double lowest_score();
 
1354
    inline bool request_satisfied() {
 
1355
        return est_time >= work_req;
 
1356
    }
 
1357
    void send(SCHEDULER_REQUEST&, SCHEDULER_REPLY&);
 
1358
};
 
1359
 
 
1360
// reread result from DB, make sure it's still unsent
 
1361
// TODO: from here to add_result_to_reply()
 
1362
// (which updates the DB record) should be a transaction
 
1363
//
 
1364
int read_sendable_result(DB_RESULT& result) {
 
1365
    int retval = result.lookup_id(result.id);
 
1366
    if (retval) {
 
1367
        log_messages.printf(MSG_CRITICAL,
 
1368
            "[RESULT#%d] result.lookup_id() failed %d\n",
 
1369
            result.id, retval
 
1370
        );
 
1371
        return ERR_NOT_FOUND;
 
1372
    }
 
1373
    if (result.server_state != RESULT_SERVER_STATE_UNSENT) {
 
1374
        log_messages.printf(MSG_NORMAL,
 
1375
            "[RESULT#%d] expected to be unsent; instead, state is %d\n",
 
1376
            result.id, result.server_state
 
1377
        );
 
1378
        return ERR_BAD_RESULT_STATE;
 
1379
    }
1098
1380
    return 0;
1099
1381
}
1100
1382
 
1101
 
const char *BOINC_RCSID_32dcd335e7 = "$Id: sched_send.C 12791 2007-05-31 18:14:45Z boincadm $";
 
1383
// compute a "score" for sending this job to this host.
 
1384
// return 0 if the WU is infeasible
 
1385
//
 
1386
void JOB::get_score(SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply) {
 
1387
    bool found;
 
1388
    WORKUNIT wu;
 
1389
    int retval;
 
1390
 
 
1391
    WU_RESULT& wu_result = ssp->wu_results[index];
 
1392
    wu = wu_result.workunit;
 
1393
    app = ssp->lookup_app(wu.appid);
 
1394
 
 
1395
    score = 0;
 
1396
 
 
1397
    // Find the app and app_version for the client's platform.
 
1398
    //
 
1399
    if (anonymous(sreq.platforms.list[0])) {
 
1400
        found = sreq.has_version(*app);
 
1401
        if (!found) return;
 
1402
        bavp = NULL;
 
1403
    } else {
 
1404
        bavp = get_app_version(sreq, reply, wu);
 
1405
        if (!bavp) return;
 
1406
    }
 
1407
 
 
1408
    retval = wu_is_infeasible_fast(wu, sreq, reply, *app);
 
1409
    if (retval) {
 
1410
        if (config.debug_send) {
 
1411
            log_messages.printf(MSG_DEBUG,
 
1412
                "[HOST#%d] [WU#%d %s] WU is infeasible: %s\n",
 
1413
                reply.host.id, wu.id, wu.name, infeasible_string(retval)
 
1414
            );
 
1415
        }
 
1416
        return;
 
1417
    }
 
1418
 
 
1419
    score = 1;
 
1420
 
 
1421
    // check if user has selected apps
 
1422
    //
 
1423
    if (!reply.wreq.host_info.allow_beta_work || config.distinct_beta_apps) {
 
1424
        if (app_not_selected(wu, sreq, reply)) {
 
1425
            if (!reply.wreq.host_info.allow_non_preferred_apps) {
 
1426
                score = 0;
 
1427
                return;
 
1428
            }
 
1429
        } else {
 
1430
            if (reply.wreq.host_info.allow_non_preferred_apps) {
 
1431
                score += 1;
 
1432
            }
 
1433
        }
 
1434
    }
 
1435
 
 
1436
    // if it's a beta user, try to send beta jobs
 
1437
    //
 
1438
    if (app->beta) {
 
1439
        if (reply.wreq.host_info.allow_beta_work) {
 
1440
            score += 1;
 
1441
        } else {
 
1442
            score = 0;
 
1443
            return;
 
1444
        }
 
1445
    }
 
1446
 
 
1447
    // if job needs to get done fast, send to fast/reliable host
 
1448
    //
 
1449
    if (reply.wreq.host_info.reliable && (wu_result.need_reliable)) {
 
1450
        score += 1;
 
1451
    }
 
1452
    
 
1453
    // if job already committed to an HR class,
 
1454
    // try to send to host in that class
 
1455
    //
 
1456
    if (wu_result.infeasible_count) {
 
1457
        score += 1;
 
1458
    }
 
1459
 
 
1460
    // If user has selected some apps but will accept jobs from others,
 
1461
    // try to send them jobs from the selected apps
 
1462
    //
 
1463
}
 
1464
 
 
1465
bool wu_is_infeasible_slow(
 
1466
    WU_RESULT& wu_result, SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply
 
1467
) {
 
1468
    char buf[256];
 
1469
    int retval;
 
1470
    int n;
 
1471
    DB_RESULT result;
 
1472
 
 
1473
    // Don't send if we've already sent a result of this WU to this user.
 
1474
    //
 
1475
    if (config.one_result_per_user_per_wu) {
 
1476
        sprintf(buf,
 
1477
            "where workunitid=%d and userid=%d",
 
1478
            wu_result.workunit.id, reply.user.id
 
1479
        );
 
1480
        retval = result.count(n, buf);
 
1481
        if (retval) {
 
1482
            log_messages.printf(MSG_CRITICAL,
 
1483
                "send_work: can't get result count (%d)\n", retval
 
1484
            );
 
1485
            return true;
 
1486
        } else {
 
1487
            if (n>0) {
 
1488
                if (config.debug_send) {
 
1489
                    log_messages.printf(MSG_DEBUG,
 
1490
                        "send_work: user %d already has %d result(s) for WU %d\n",
 
1491
                        reply.user.id, n, wu_result.workunit.id
 
1492
                    );
 
1493
                }
 
1494
                return true;
 
1495
            }
 
1496
        }
 
1497
    } else if (config.one_result_per_host_per_wu) {
 
1498
        // Don't send if we've already sent a result
 
1499
        // of this WU to this host.
 
1500
        // We only have to check this
 
1501
        // if we don't send one result per user.
 
1502
        //
 
1503
        sprintf(buf,
 
1504
            "where workunitid=%d and hostid=%d",
 
1505
            wu_result.workunit.id, reply.host.id
 
1506
        );
 
1507
        retval = result.count(n, buf);
 
1508
        if (retval) {
 
1509
            log_messages.printf(MSG_CRITICAL,
 
1510
                "send_work: can't get result count (%d)\n", retval
 
1511
            );
 
1512
            return true;
 
1513
        } else {
 
1514
            if (n>0) {
 
1515
                if (config.debug_send) {
 
1516
                    log_messages.printf(MSG_DEBUG,
 
1517
                        "send_work: host %d already has %d result(s) for WU %d\n",
 
1518
                        reply.host.id, n, wu_result.workunit.id
 
1519
                    );
 
1520
                }
 
1521
                return true;
 
1522
            }
 
1523
        }
 
1524
    }
 
1525
 
 
1526
    APP* app = ssp->lookup_app(wu_result.workunit.appid);
 
1527
    WORKUNIT wu = wu_result.workunit;
 
1528
    if (app_hr_type(*app)) {
 
1529
        if (already_sent_to_different_platform_careful(
 
1530
            sreq, reply.wreq, wu, *app
 
1531
        )) {
 
1532
            if (config.debug_send) {
 
1533
                log_messages.printf(MSG_DEBUG,
 
1534
                    "[HOST#%d] [WU#%d %s] WU is infeasible (assigned to different platform)\n",
 
1535
                    reply.host.id, wu.id, wu.name
 
1536
                );
 
1537
            }
 
1538
            // Mark the workunit as infeasible.
 
1539
            // This ensures that jobs already assigned to a platform
 
1540
            // are processed first.
 
1541
            //
 
1542
            wu_result.infeasible_count++;
 
1543
            return true;
 
1544
        }
 
1545
    }
 
1546
    return false;
 
1547
}
 
1548
 
 
1549
double JOB_SET::lowest_score() {
 
1550
    if (jobs.empty()) return 0;
 
1551
    return jobs.back().score;
 
1552
}
 
1553
 
 
1554
// add the given job, and remove lowest-score jobs
 
1555
// that are in excess of work request
 
1556
// or that cause the disk limit to be exceeded
 
1557
//
 
1558
void JOB_SET::add_job(JOB& job) {
 
1559
    while (!jobs.empty()) {
 
1560
        JOB& worst_job = jobs.back();
 
1561
        if (est_time + job.est_time - worst_job.est_time > work_req) {
 
1562
            est_time -= worst_job.est_time;
 
1563
            disk_usage -= worst_job.disk_usage;
 
1564
            jobs.pop_back();
 
1565
        }
 
1566
    }
 
1567
    while (!jobs.empty()) {
 
1568
        JOB& worst_job = jobs.back();
 
1569
        if (disk_usage + job.disk_usage > disk_limit) {
 
1570
            est_time -= worst_job.est_time;
 
1571
            disk_usage -= worst_job.disk_usage;
 
1572
            jobs.pop_back();
 
1573
        }
 
1574
    }
 
1575
    list<JOB>::iterator i = jobs.begin();
 
1576
    while (i != jobs.end()) {
 
1577
        if (i->score < job.score) {
 
1578
            jobs.insert(i, job);
 
1579
            break;
 
1580
        }
 
1581
        i++;
 
1582
    }
 
1583
    if (i == jobs.end()) {
 
1584
        jobs.push_back(job);
 
1585
    }
 
1586
    est_time += job.est_time;
 
1587
    disk_usage += job.disk_usage;
 
1588
}
 
1589
 
 
1590
// return the disk usage of jobs above the given score
 
1591
//
 
1592
double JOB_SET::higher_score_disk_usage(double v) {
 
1593
    double sum = 0;
 
1594
    list<JOB>::iterator i = jobs.begin();
 
1595
    while (i != jobs.end()) {
 
1596
        if (i->score < v) break;
 
1597
        sum += i->disk_usage;
 
1598
        i++;
 
1599
    }
 
1600
    return sum;
 
1601
}
 
1602
 
 
1603
void JOB_SET::send(SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply) {
 
1604
    WORKUNIT wu;
 
1605
    DB_RESULT result;
 
1606
    int retval;
 
1607
 
 
1608
    list<JOB>::iterator i = jobs.begin();
 
1609
    while (i != jobs.end()) {
 
1610
        JOB& job = *i;
 
1611
        WU_RESULT wu_result = ssp->wu_results[job.index];
 
1612
        ssp->wu_results[job.index].state = WR_STATE_EMPTY;
 
1613
        wu = wu_result.workunit;
 
1614
        result.id = wu_result.resultid;
 
1615
        retval = read_sendable_result(result);
 
1616
        if (retval) continue;
 
1617
        add_result_to_reply(result, wu, sreq, reply, job.bavp);
 
1618
        i++;
 
1619
    }
 
1620
}
 
1621
 
 
1622
void send_work_matchmaker(SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply) {
 
1623
    int i, slots_scanned=0, slots_locked=0;
 
1624
    JOB_SET jobs;
 
1625
    int min_slots = 20;
 
1626
    int max_slots = 50;
 
1627
    int max_locked = 10;
 
1628
    int pid = getpid();
 
1629
 
 
1630
    lock_sema();
 
1631
    i = rand() % ssp->max_wu_results;
 
1632
    while (1) {
 
1633
        i = (i+1) % ssp->max_wu_results;
 
1634
        slots_scanned++;
 
1635
        if (slots_scanned >= max_slots) break;
 
1636
        WU_RESULT& wu_result = ssp->wu_results[i];
 
1637
        switch (wu_result.state) {
 
1638
        case WR_STATE_EMPTY:
 
1639
            continue;
 
1640
        case WR_STATE_PRESENT:
 
1641
            break;
 
1642
        default:
 
1643
            slots_locked++;
 
1644
            continue;
 
1645
        }
 
1646
 
 
1647
        JOB job;
 
1648
        job.index = i;
 
1649
        job.get_score(sreq, reply);
 
1650
        if (config.debug_send) {
 
1651
            log_messages.printf(MSG_DEBUG,
 
1652
                "score for %s: %f\n", wu_result.workunit.name, job.score
 
1653
            );
 
1654
        }
 
1655
        if (job.score > jobs.lowest_score()) {
 
1656
            ssp->wu_results[i].state = pid;
 
1657
            unlock_sema();
 
1658
            if (wu_is_infeasible_slow(wu_result, sreq, reply)) {
 
1659
                lock_sema();
 
1660
                ssp->wu_results[i].state = WR_STATE_EMPTY;
 
1661
                continue;
 
1662
            }
 
1663
            lock_sema();
 
1664
            jobs.add_job(job);
 
1665
        }
 
1666
 
 
1667
        if (jobs.request_satisfied() && slots_scanned>=min_slots) break;
 
1668
    }
 
1669
 
 
1670
    jobs.send(sreq, reply);
 
1671
    unlock_sema();
 
1672
    if (slots_locked > max_locked) {
 
1673
        log_messages.printf(MSG_CRITICAL,
 
1674
            "Found too many locked slots (%d>%d) - increase array size",
 
1675
            slots_locked, max_locked
 
1676
        );
 
1677
    }
 
1678
}
 
1679
#endif
 
1680
 
 
1681
const char *BOINC_RCSID_32dcd335e7 = "$Id: sched_send.C 15176 2008-05-12 20:25:35Z romw $";