~ubuntu-branches/ubuntu/lucid/boinc/lucid

« back to all changes in this revision

Viewing changes to sched/transitioner.C

  • Committer: Bazaar Package Importer
  • Author(s): Frank S. Thomas, Frank S. Thomas
  • Date: 2008-05-31 08:02:47 UTC
  • mfrom: (1.1.8 upstream)
  • Revision ID: james.westby@ubuntu.com-20080531080247-4ce890lp2rc768cr
Tags: 6.2.7-1
[ Frank S. Thomas ]
* New upstream release.
  - BOINC Manager: Redraw disk usage charts immediately after connecting to
    a (different) client. (closes: 463823)
* debian/copyright:
  - Added the instructions from debian/README.Debian-source about how
    repackaged BOINC tarballs can be reproduced because DevRef now
    recommends to put this here instead of in the afore-mentioned file.
  - Updated for the new release.
* Removed the obsolete debian/README.Debian-source.
* For consistency upstream renamed the core client and the command tool
  ("boinc_client" to "boinc" and "boinc_cmd" to "boinccmd"). Done the same
  in all packages and created symlinks with the old names for the binaries
  and man pages. Also added an entry in debian/boinc-client.NEWS explaining
  this change.
* debian/rules: Do not list Makefile.ins in the clean target individually,
  just remove all that can be found.

Show diffs side-by-side

added added

removed removed

Lines of Context:
15
15
// To view the GNU Lesser General Public License visit
16
16
// http://www.gnu.org/copyleft/lesser.html
17
17
// or write to the Free Software Foundation, Inc.,
18
 
// 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
18
// 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
19
19
 
20
20
// transitioner - handle transitions in the state of a WU
21
21
//    - a result has become DONE (via timeout or client reply)
31
31
#include "config.h"
32
32
#include <vector>
33
33
#include <unistd.h>
 
34
#include <cstring>
34
35
#include <climits>
 
36
#include <cstdlib>
 
37
#include <string>
35
38
#include <sys/time.h>
36
39
 
37
40
#include "boinc_db.h"
38
41
#include "util.h"
39
42
#include "backend_lib.h"
 
43
#include "common_defs.h"
 
44
 
40
45
#include "sched_config.h"
41
46
#include "sched_util.h"
42
47
#include "sched_msgs.h"
50
55
#else
51
56
#define SLEEP_INTERVAL  5
52
57
#endif
53
 
#define BATCH_INSERT    1
54
58
 
55
59
int startup_time;
56
 
SCHED_CONFIG config;
57
60
R_RSA_PRIVATE_KEY key;
58
61
int mod_n, mod_i;
59
62
bool do_mod = false;
74
77
    int retval = host.lookup_id(hostid);
75
78
    if (retval) return retval;
76
79
    compute_avg_turnaround(host, delay_bound);
77
 
    if (host.max_results_day <= 0 || host.max_results_day > config.daily_result_quota) {
 
80
    if (host.max_results_day == 0 || host.max_results_day > config.daily_result_quota) {
78
81
        host.max_results_day = config.daily_result_quota;
79
82
    }
80
83
    host.max_results_day -= 1;
100
103
    bool all_over_and_validated, have_new_result_to_validate, do_delete;
101
104
    unsigned int i;
102
105
 
 
106
    TRANSITIONER_ITEM& wu_item = items[0];
 
107
    TRANSITIONER_ITEM wu_item_original = wu_item;
 
108
 
 
109
    // "assigned" WUs aren't supposed to pass through the transitioner.
 
110
    // If we get one, it's an error
 
111
    //
 
112
    if (config.enable_assignment && strstr(wu_item.name, ASSIGNED_WU_STR)) {
 
113
        DB_WORKUNIT wu;
 
114
        char buf[256];
 
115
 
 
116
        wu.id = wu_item.id;
 
117
        log_messages.printf(MSG_CRITICAL,
 
118
            "Assigned WU %d unexpectedly found by transitioner\n", wu.id
 
119
        );
 
120
        sprintf(buf, "transition_time=%d", INT_MAX);
 
121
        retval = wu.update_field(buf);
 
122
        if (retval) {
 
123
            log_messages.printf(MSG_CRITICAL,
 
124
                "update_field failed %d\n", retval
 
125
            );
 
126
        }
 
127
        return 0;
 
128
    }
 
129
 
103
130
    // count up the number of results in various states,
104
131
    // and check for timed-out results
105
132
    //
115
142
    have_new_result_to_validate = false;
116
143
    int rs, max_result_suffix = -1;
117
144
 
118
 
    TRANSITIONER_ITEM& wu_item = items[0];
119
 
    TRANSITIONER_ITEM wu_item_original = wu_item;
120
 
 
121
145
    // Scan the WU's results, and find the canonical result if there is one
122
146
    //
123
147
    canonical_result_index = -1;
132
156
    }
133
157
 
134
158
    if (wu_item.canonical_resultid && (canonical_result_index == -1)) {
135
 
        log_messages.printf(
136
 
            SCHED_MSG_LOG::MSG_CRITICAL,
 
159
        log_messages.printf(MSG_CRITICAL,
137
160
            "[WU#%d %s] can't find canonical result\n",
138
161
            wu_item.id, wu_item.name
139
162
        );
171
194
            break;
172
195
        case RESULT_SERVER_STATE_IN_PROGRESS:
173
196
            if (res_item.res_report_deadline < now) {
174
 
                log_messages.printf(
175
 
                    SCHED_MSG_LOG::MSG_NORMAL,
 
197
                log_messages.printf(MSG_NORMAL,
176
198
                    "[WU#%d %s] [RESULT#%d %s] result timed out (%d < %d) server_state:IN_PROGRESS=>OVER; outcome:NO_REPLY\n",
177
199
                    wu_item.id, wu_item.name, res_item.res_id, res_item.res_name,
178
200
                    res_item.res_report_deadline, (int)now
181
203
                res_item.res_outcome = RESULT_OUTCOME_NO_REPLY;
182
204
                retval = transitioner.update_result(res_item);
183
205
                if (retval) {
184
 
                    log_messages.printf(
185
 
                        SCHED_MSG_LOG::MSG_CRITICAL,
 
206
                    log_messages.printf(MSG_CRITICAL,
186
207
                        "[WU#%d %s] [RESULT#%d %s] update_result(): %d\n",
187
208
                        wu_item.id, wu_item.name, res_item.res_id,
188
209
                        res_item.res_name, retval
199
220
            nover++;
200
221
            switch (res_item.res_outcome) {
201
222
            case RESULT_OUTCOME_COULDNT_SEND:
202
 
                log_messages.printf(
203
 
                    SCHED_MSG_LOG::MSG_NORMAL,
 
223
                log_messages.printf(MSG_NORMAL,
204
224
                    "[WU#%d %s] [RESULT#%d %s] result couldn't be sent\n",
205
225
                    wu_item.id, wu_item.name, res_item.res_id, res_item.res_name
206
226
                );
211
231
                    if (canonical_result_files_deleted) {
212
232
                        res_item.res_validate_state = VALIDATE_STATE_TOO_LATE;
213
233
                        retval = transitioner.update_result(res_item);
214
 
                        log_messages.printf(
215
 
                            SCHED_MSG_LOG::MSG_NORMAL,
 
234
                        log_messages.printf(MSG_NORMAL,
216
235
                            "[WU#%d %s] [RESULT#%d %s] validate_state:INIT=>TOO_LATE retval %d\n",
217
236
                            wu_item.id, wu_item.name, res_item.res_id,
218
237
                            res_item.res_name, retval
239
258
        }
240
259
    }
241
260
 
242
 
    log_messages.printf(
243
 
        SCHED_MSG_LOG::MSG_DEBUG,
 
261
    log_messages.printf(MSG_DEBUG,
244
262
        "[WU#%d %s] %d results: unsent %d, in_progress %d, over %d (success %d, error %d, couldnt_send %d, no_reply %d, didnt_need %d)\n",
245
263
        wu_item.id, wu_item.name, ntotal, nunsent, ninprogress, nover,
246
264
        nsuccess, nerrors, ncouldnt_send, nno_reply, ndidnt_need
250
268
    //
251
269
    if (have_new_result_to_validate && (nsuccess >= wu_item.min_quorum)) {
252
270
        wu_item.need_validate = true;
253
 
        log_messages.printf(
254
 
            SCHED_MSG_LOG::MSG_NORMAL,
 
271
        log_messages.printf(MSG_NORMAL,
255
272
            "[WU#%d %s] need_validate:=>true\n", wu_item.id, wu_item.name
256
273
        );
257
274
    }
266
283
    // if WU has results with errors and no success yet,
267
284
    // reset homogeneous redundancy class to give other platforms a try
268
285
    //
269
 
    if (nerrors & !(nsuccess | ninprogress)) {
 
286
    if (nerrors & !(nsuccess || ninprogress)) {
270
287
        wu_item.hr_class = 0;
271
288
    }
272
289
 
273
290
    if (nerrors > wu_item.max_error_results) {
274
 
        log_messages.printf(
275
 
            SCHED_MSG_LOG::MSG_NORMAL,
 
291
        log_messages.printf(MSG_NORMAL,
276
292
            "[WU#%d %s] WU has too many errors (%d errors for %d results)\n",
277
293
            wu_item.id, wu_item.name, nerrors, (int)items.size()
278
294
        );
279
295
        wu_item.error_mask |= WU_ERROR_TOO_MANY_ERROR_RESULTS;
280
296
    }
281
297
    if ((int)items.size() > wu_item.max_total_results) {
282
 
        log_messages.printf(
283
 
            SCHED_MSG_LOG::MSG_NORMAL,
 
298
        log_messages.printf(MSG_NORMAL,
284
299
            "[WU#%d %s] WU has too many total results (%d)\n",
285
300
            wu_item.id, wu_item.name, (int)items.size()
286
301
        );
297
312
                bool update_result = false;
298
313
                switch(res_item.res_server_state) {
299
314
                case RESULT_SERVER_STATE_UNSENT:
300
 
                    log_messages.printf(
301
 
                        SCHED_MSG_LOG::MSG_NORMAL,
 
315
                    log_messages.printf(MSG_NORMAL,
302
316
                        "[WU#%d %s] [RESULT#%d %s] server_state:UNSENT=>OVER; outcome:=>DIDNT_NEED\n",
303
317
                        wu_item.id, wu_item.name, res_item.res_id, res_item.res_name
304
318
                    );
321
335
                if (update_result) {
322
336
                    retval = transitioner.update_result(res_item);
323
337
                    if (retval) {
324
 
                        log_messages.printf(
325
 
                            SCHED_MSG_LOG::MSG_CRITICAL,
 
338
                        log_messages.printf(MSG_CRITICAL,
326
339
                            "[WU#%d %s] [RESULT#%d %s] result.update() == %d\n",
327
340
                            wu_item.id, wu_item.name, res_item.res_id, res_item.res_name, retval
328
341
                        );
332
345
        }
333
346
        if (wu_item.assimilate_state == ASSIMILATE_INIT) {
334
347
            wu_item.assimilate_state = ASSIMILATE_READY;
335
 
            log_messages.printf(
336
 
                SCHED_MSG_LOG::MSG_NORMAL,
 
348
            log_messages.printf(MSG_NORMAL,
337
349
                "[WU#%d %s] error_mask:%d assimilate_state:INIT=>READY\n",
338
350
                wu_item.id, wu_item.name, wu_item.error_mask
339
351
            );
340
352
        }
341
 
    } else if (wu_item.assimilate_state == ASSIMILATE_INIT) {
 
353
    } else if (wu_item.canonical_resultid == 0) {
342
354
        // Here if no WU-level error.
343
355
        // Generate new results if needed.
344
356
        // NOTE: n must be signed
348
360
        char value_buf[MAX_QUERY_LEN];
349
361
        if (n > 0) {
350
362
            log_messages.printf(
351
 
                SCHED_MSG_LOG::MSG_NORMAL,
 
363
                MSG_NORMAL,
352
364
                "[WU#%d %s] Generating %d more results (%d target - %d unsent - %d in progress - %d success)\n",
353
365
                wu_item.id, wu_item.name, n, wu_item.target_nresults, nunsent, ninprogress, nsuccess
354
366
            );
357
369
                char rtfpath[256];
358
370
                sprintf(rtfpath, "../%s", wu_item.result_template_file);
359
371
                int priority_increase = 0;
360
 
                if ( nover && config.reliable_priority_on_over ) {
361
 
                    priority_increase = priority_increase + config.reliable_priority_on_over;
 
372
                if (nover && config.reliable_priority_on_over) {
 
373
                    priority_increase += config.reliable_priority_on_over;
362
374
                } else if (nover && !nerrors && config.reliable_priority_on_over_except_error) {
363
 
                    priority_increase = priority_increase + config.reliable_priority_on_over_except_error;
 
375
                    priority_increase += config.reliable_priority_on_over_except_error;
364
376
                }
365
 
#ifdef BATCH_INSERT
366
 
                retval = create_result(
 
377
                retval = create_result_ti(
367
378
                    wu_item, rtfpath, suffix, key, config, value_buf, priority_increase
368
379
                );
369
380
                if (retval) {
370
 
                    log_messages.printf(
371
 
                        SCHED_MSG_LOG::MSG_CRITICAL,
372
 
                        "[WU#%d %s] create_result() %d\n",
 
381
                    log_messages.printf(MSG_CRITICAL,
 
382
                        "[WU#%d %s] create_result_ti() %d\n",
373
383
                        wu_item.id, wu_item.name, retval
374
384
                    );
375
385
                    return retval;
380
390
                    values += ",";
381
391
                    values += value_buf;
382
392
                }
383
 
#else
384
 
                retval = create_result(
385
 
                    wu_item, rtfpath, suffix, key, config, 0, priority_increase
386
 
                );
387
 
                if (retval) {
388
 
                    log_messages.printf(
389
 
                        SCHED_MSG_LOG::MSG_CRITICAL,
390
 
                        "[WU#%d %s] create_result() %d\n",
391
 
                        wu_item.id, wu_item.name, retval
392
 
                    );
393
 
                    return retval;
394
 
                }
395
 
#endif
396
393
            }
397
 
#ifdef BATCH_INSERT
398
394
            DB_RESULT r;
399
395
            retval = r.insert_batch(values);
400
396
            if (retval) {
401
 
                log_messages.printf(
402
 
                    SCHED_MSG_LOG::MSG_CRITICAL,
 
397
                log_messages.printf(MSG_CRITICAL,
403
398
                    "[WU#%d %s] insert_batch() %d\n",
404
399
                    wu_item.id, wu_item.name, retval
405
400
                );
406
401
                return retval;
407
402
            }
408
 
#endif
409
403
        }
410
404
    }
411
405
 
413
407
    //  - see if all over and validated
414
408
    //
415
409
    all_over_and_validated = true;
 
410
    bool all_over_and_ready_to_assimilate = true; // used for the defer assmilation
416
411
        int most_recently_returned = 0;
417
412
    for (i=0; i<items.size(); i++) {
418
413
        TRANSITIONER_ITEM& res_item = items[i];
424
419
                if (res_item.res_outcome == RESULT_OUTCOME_SUCCESS) {
425
420
                    if (res_item.res_validate_state == VALIDATE_STATE_INIT) {
426
421
                        all_over_and_validated = false;
 
422
                        all_over_and_ready_to_assimilate = false;
427
423
                    }
428
424
                } else if ( res_item.res_outcome == RESULT_OUTCOME_NO_REPLY ) {
429
425
                        if ( ( res_item.res_report_deadline + config.grace_period_hours*60*60 ) > now ) {
432
428
                }
433
429
            } else {
434
430
                all_over_and_validated = false;
 
431
                all_over_and_ready_to_assimilate = false;
435
432
            }
436
433
        }
437
434
    }
438
435
 
 
436
    // If we are defering assimilation until all results are over
 
437
    // and validated then when that happens we need to make sure
 
438
    // that it gets advanced to assimilate ready
 
439
    // the items.size is a kludge
 
440
    //
 
441
    if (all_over_and_ready_to_assimilate == true && wu_item.assimilate_state == ASSIMILATE_INIT && items.size() > 0 && wu_item.canonical_resultid > 0
 
442
    ) {
 
443
        wu_item.assimilate_state = ASSIMILATE_READY;
 
444
        log_messages.printf(MSG_NORMAL,
 
445
            "[WU#%d %s] Deferred assimililation now set to ASSIMILATE_STATE_READY\n",
 
446
            wu_item.id, wu_item.name
 
447
        );
 
448
    }
439
449
    // if WU is assimilated, trigger file deletion
440
450
    //
441
451
    if (wu_item.assimilate_state == ASSIMILATE_DONE && ((most_recently_returned + config.delete_delay_hours*60*60) < now)) {
443
453
        //
444
454
        if (all_over_and_validated && wu_item.file_delete_state == FILE_DELETE_INIT) {
445
455
            wu_item.file_delete_state = FILE_DELETE_READY;
446
 
            log_messages.printf(
447
 
                SCHED_MSG_LOG::MSG_DEBUG,
 
456
            log_messages.printf(MSG_DEBUG,
448
457
                "[WU#%d %s] ASSIMILATE_DONE: file_delete_state:=>READY\n",
449
458
                wu_item.id, wu_item.name
450
459
            );
474
483
                    break;
475
484
                }
476
485
                if (do_delete && res_item.res_file_delete_state == FILE_DELETE_INIT) {
477
 
                    log_messages.printf(
478
 
                        SCHED_MSG_LOG::MSG_NORMAL,
 
486
                    log_messages.printf(MSG_NORMAL,
479
487
                        "[WU#%d %s] [RESULT#%d %s] file_delete_state:=>READY\n",
480
488
                        wu_item.id, wu_item.name, res_item.res_id, res_item.res_name
481
489
                    );
483
491
 
484
492
                    retval = transitioner.update_result(res_item);
485
493
                    if (retval) {
486
 
                        log_messages.printf(
487
 
                            SCHED_MSG_LOG::MSG_CRITICAL,
 
494
                        log_messages.printf(MSG_CRITICAL,
488
495
                            "[WU#%d %s] [RESULT#%d %s] result.update() == %d\n",
489
496
                            wu_item.id, wu_item.name, res_item.res_id, res_item.res_name, retval
490
497
                        );
493
500
            }
494
501
        }
495
502
    } else if ( wu_item.assimilate_state == ASSIMILATE_DONE ) {
496
 
                log_messages.printf(SCHED_MSG_LOG::MSG_DEBUG, "[WU#%d %s] not checking for items to be ready for delete because the deferred delete time has not expired.  That will occur in %d seconds\n", wu_item.id, wu_item.name, most_recently_returned + config.delete_delay_hours*60*60-now);
 
503
                log_messages.printf(MSG_DEBUG,
 
504
            "[WU#%d %s] not checking for items to be ready for delete because the deferred delete time has not expired.  That will occur in %d seconds\n",
 
505
            wu_item.id,
 
506
            wu_item.name,
 
507
            most_recently_returned + config.delete_delay_hours*60*60-(int)now
 
508
        );
497
509
    }
498
510
 
499
511
    // compute next transition time = minimum timeout of in-progress results
547
559
            }
548
560
        }
549
561
    }
550
 
    // If either of the grace period or delete delay is less then the next transition time then use that value
 
562
    // If either of the grace period or delete delay is less than
 
563
    // the next transition time then use that value
 
564
    //
551
565
    if ( max_grace_or_delay_time < wu_item.transition_time && max_grace_or_delay_time > now && ninprogress == 0) {
552
566
        wu_item.transition_time = max_grace_or_delay_time;
553
 
        log_messages.printf(SCHED_MSG_LOG::MSG_NORMAL,"[WU#%d %s] Delaying transition due to grace period or delete day.  New transition time = %d sec\n",wu_item.id, wu_item.name, wu_item.transition_time);    
 
567
        log_messages.printf(MSG_NORMAL,
 
568
            "[WU#%d %s] Delaying transition due to grace period or delete day.  New transition time = %d sec\n",
 
569
            wu_item.id, wu_item.name, wu_item.transition_time
 
570
        );
554
571
    }
555
572
    
556
573
    // If transition time is in the past,
557
574
    // the system is bogged down and behind schedule.
558
 
    // Delay processing of the WU by an amount DOUBLE the amount
559
 
    // we are behind, but not less than 60 secs or more than
560
 
    // one day.
 
575
    // Delay processing of the WU by an amount DOUBLE the amount we are behind,
 
576
    // but not less than 60 secs or more than one day.
 
577
    //
561
578
    if (wu_item.transition_time < now) {
562
579
        int extra_delay = 2*(now - wu_item.transition_time);
563
580
        if (extra_delay < 60) extra_delay = 60;
564
581
        if (extra_delay > 86400) extra_delay = 86400;
565
 
        log_messages.printf(
566
 
            SCHED_MSG_LOG::MSG_DEBUG,
 
582
        log_messages.printf(MSG_DEBUG,
567
583
            "[WU#%d %s] transition time in past: adding extra delay %d sec\n",
568
584
            wu_item.id, wu_item.name, extra_delay
569
585
        );
570
586
        wu_item.transition_time = now + extra_delay;
571
587
    }
572
588
 
573
 
    log_messages.printf(
574
 
        SCHED_MSG_LOG::MSG_DEBUG,
 
589
    log_messages.printf(MSG_DEBUG,
575
590
        "[WU#%d %s] setting transition_time to %d\n",
576
591
        wu_item.id, wu_item.name, wu_item.transition_time
577
592
    );
578
593
 
579
594
    retval = transitioner.update_workunit(wu_item, wu_item_original);
580
595
    if (retval) {
581
 
        log_messages.printf(
582
 
            SCHED_MSG_LOG::MSG_CRITICAL,
 
596
        log_messages.printf(MSG_CRITICAL,
583
597
            "[WU#%d %s] workunit.update() == %d\n",
584
598
            wu_item.id, wu_item.name, retval
585
599
        );
605
619
 
606
620
        retval = handle_wu(transitioner, items);
607
621
        if (retval) {
608
 
            log_messages.printf(
609
 
                SCHED_MSG_LOG::MSG_CRITICAL,
 
622
            log_messages.printf(MSG_CRITICAL,
610
623
                "[WU#%d %s] handle_wu: %d; quitting\n",
611
624
                wu_item.id, wu_item.name, retval
612
625
            );
623
636
 
624
637
    retval = boinc_db.open(config.db_name, config.db_host, config.db_user, config.db_passwd);
625
638
    if (retval) {
626
 
        log_messages.printf(SCHED_MSG_LOG::MSG_CRITICAL, "boinc_db.open: %d\n", retval);
 
639
        log_messages.printf(MSG_CRITICAL, "boinc_db.open: %d\n", retval);
627
640
        exit(1);
628
641
    }
629
642
 
630
643
    while (1) {
 
644
        log_messages.printf(MSG_DEBUG, "doing a pass\n");
631
645
        if (!do_pass()) {
632
646
            if (one_pass) break;
 
647
            log_messages.printf(MSG_DEBUG, "sleeping %d\n", SLEEP_INTERVAL);
633
648
            sleep(SLEEP_INTERVAL);
634
649
        }
635
650
    }
655
670
 
656
671
    retval = config.parse_file("..");
657
672
    if (retval) {
658
 
        log_messages.printf(SCHED_MSG_LOG::MSG_CRITICAL, "can't read config file\n");
 
673
        log_messages.printf(MSG_CRITICAL, "can't read config file\n");
659
674
        exit(1);
660
675
    }
661
676
 
662
677
    sprintf(path, "%s/upload_private", config.key_dir);
663
678
    retval = read_key_file(path, key);
664
679
    if (retval) {
665
 
        log_messages.printf(SCHED_MSG_LOG::MSG_CRITICAL, "can't read key\n");
 
680
        log_messages.printf(MSG_CRITICAL, "can't read key\n");
666
681
        exit(1);
667
682
    }
668
683
 
669
 
    log_messages.printf(SCHED_MSG_LOG::MSG_NORMAL, "Starting\n");
 
684
    log_messages.printf(MSG_NORMAL, "Starting\n");
670
685
 
671
686
    install_stop_signal_handler();
672
687
 
673
688
    main_loop();
674
689
}
675
690
 
676
 
const char *BOINC_RCSID_be98c91511 = "$Id: transitioner.C 12773 2007-05-29 23:41:31Z boincadm $";
 
691
const char *BOINC_RCSID_be98c91511 = "$Id: transitioner.C 14908 2008-03-13 23:35:13Z davea $";