15
15
// To view the GNU Lesser General Public License visit
16
16
// http://www.gnu.org/copyleft/lesser.html
17
17
// or write to the Free Software Foundation, Inc.,
18
// 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20
20
// transitioner - handle transitions in the state of a WU
21
21
// - a result has become DONE (via timeout or client reply)
74
77
int retval = host.lookup_id(hostid);
75
78
if (retval) return retval;
76
79
compute_avg_turnaround(host, delay_bound);
77
if (host.max_results_day <= 0 || host.max_results_day > config.daily_result_quota) {
80
if (host.max_results_day == 0 || host.max_results_day > config.daily_result_quota) {
78
81
host.max_results_day = config.daily_result_quota;
80
83
host.max_results_day -= 1;
100
103
bool all_over_and_validated, have_new_result_to_validate, do_delete;
106
TRANSITIONER_ITEM& wu_item = items[0];
107
TRANSITIONER_ITEM wu_item_original = wu_item;
109
// "assigned" WUs aren't supposed to pass through the transitioner.
110
// If we get one, it's an error
112
if (config.enable_assignment && strstr(wu_item.name, ASSIGNED_WU_STR)) {
117
log_messages.printf(MSG_CRITICAL,
118
"Assigned WU %d unexpectedly found by transitioner\n", wu.id
120
sprintf(buf, "transition_time=%d", INT_MAX);
121
retval = wu.update_field(buf);
123
log_messages.printf(MSG_CRITICAL,
124
"update_field failed %d\n", retval
103
130
// count up the number of results in various states,
104
131
// and check for timed-out results
115
142
have_new_result_to_validate = false;
116
143
int rs, max_result_suffix = -1;
118
TRANSITIONER_ITEM& wu_item = items[0];
119
TRANSITIONER_ITEM wu_item_original = wu_item;
121
145
// Scan the WU's results, and find the canonical result if there is one
123
147
canonical_result_index = -1;
172
195
case RESULT_SERVER_STATE_IN_PROGRESS:
173
196
if (res_item.res_report_deadline < now) {
175
SCHED_MSG_LOG::MSG_NORMAL,
197
log_messages.printf(MSG_NORMAL,
176
198
"[WU#%d %s] [RESULT#%d %s] result timed out (%d < %d) server_state:IN_PROGRESS=>OVER; outcome:NO_REPLY\n",
177
199
wu_item.id, wu_item.name, res_item.res_id, res_item.res_name,
178
200
res_item.res_report_deadline, (int)now
181
203
res_item.res_outcome = RESULT_OUTCOME_NO_REPLY;
182
204
retval = transitioner.update_result(res_item);
185
SCHED_MSG_LOG::MSG_CRITICAL,
206
log_messages.printf(MSG_CRITICAL,
186
207
"[WU#%d %s] [RESULT#%d %s] update_result(): %d\n",
187
208
wu_item.id, wu_item.name, res_item.res_id,
188
209
res_item.res_name, retval
200
221
switch (res_item.res_outcome) {
201
222
case RESULT_OUTCOME_COULDNT_SEND:
203
SCHED_MSG_LOG::MSG_NORMAL,
223
log_messages.printf(MSG_NORMAL,
204
224
"[WU#%d %s] [RESULT#%d %s] result couldn't be sent\n",
205
225
wu_item.id, wu_item.name, res_item.res_id, res_item.res_name
211
231
if (canonical_result_files_deleted) {
212
232
res_item.res_validate_state = VALIDATE_STATE_TOO_LATE;
213
233
retval = transitioner.update_result(res_item);
215
SCHED_MSG_LOG::MSG_NORMAL,
234
log_messages.printf(MSG_NORMAL,
216
235
"[WU#%d %s] [RESULT#%d %s] validate_state:INIT=>TOO_LATE retval %d\n",
217
236
wu_item.id, wu_item.name, res_item.res_id,
218
237
res_item.res_name, retval
243
SCHED_MSG_LOG::MSG_DEBUG,
261
log_messages.printf(MSG_DEBUG,
244
262
"[WU#%d %s] %d results: unsent %d, in_progress %d, over %d (success %d, error %d, couldnt_send %d, no_reply %d, didnt_need %d)\n",
245
263
wu_item.id, wu_item.name, ntotal, nunsent, ninprogress, nover,
246
264
nsuccess, nerrors, ncouldnt_send, nno_reply, ndidnt_need
251
269
if (have_new_result_to_validate && (nsuccess >= wu_item.min_quorum)) {
252
270
wu_item.need_validate = true;
254
SCHED_MSG_LOG::MSG_NORMAL,
271
log_messages.printf(MSG_NORMAL,
255
272
"[WU#%d %s] need_validate:=>true\n", wu_item.id, wu_item.name
266
283
// if WU has results with errors and no success yet,
267
284
// reset homogeneous redundancy class to give other platforms a try
269
if (nerrors & !(nsuccess | ninprogress)) {
286
if (nerrors & !(nsuccess || ninprogress)) {
270
287
wu_item.hr_class = 0;
273
290
if (nerrors > wu_item.max_error_results) {
275
SCHED_MSG_LOG::MSG_NORMAL,
291
log_messages.printf(MSG_NORMAL,
276
292
"[WU#%d %s] WU has too many errors (%d errors for %d results)\n",
277
293
wu_item.id, wu_item.name, nerrors, (int)items.size()
279
295
wu_item.error_mask |= WU_ERROR_TOO_MANY_ERROR_RESULTS;
281
297
if ((int)items.size() > wu_item.max_total_results) {
283
SCHED_MSG_LOG::MSG_NORMAL,
298
log_messages.printf(MSG_NORMAL,
284
299
"[WU#%d %s] WU has too many total results (%d)\n",
285
300
wu_item.id, wu_item.name, (int)items.size()
297
312
bool update_result = false;
298
313
switch(res_item.res_server_state) {
299
314
case RESULT_SERVER_STATE_UNSENT:
301
SCHED_MSG_LOG::MSG_NORMAL,
315
log_messages.printf(MSG_NORMAL,
302
316
"[WU#%d %s] [RESULT#%d %s] server_state:UNSENT=>OVER; outcome:=>DIDNT_NEED\n",
303
317
wu_item.id, wu_item.name, res_item.res_id, res_item.res_name
321
335
if (update_result) {
322
336
retval = transitioner.update_result(res_item);
325
SCHED_MSG_LOG::MSG_CRITICAL,
338
log_messages.printf(MSG_CRITICAL,
326
339
"[WU#%d %s] [RESULT#%d %s] result.update() == %d\n",
327
340
wu_item.id, wu_item.name, res_item.res_id, res_item.res_name, retval
333
346
if (wu_item.assimilate_state == ASSIMILATE_INIT) {
334
347
wu_item.assimilate_state = ASSIMILATE_READY;
336
SCHED_MSG_LOG::MSG_NORMAL,
348
log_messages.printf(MSG_NORMAL,
337
349
"[WU#%d %s] error_mask:%d assimilate_state:INIT=>READY\n",
338
350
wu_item.id, wu_item.name, wu_item.error_mask
341
} else if (wu_item.assimilate_state == ASSIMILATE_INIT) {
353
} else if (wu_item.canonical_resultid == 0) {
342
354
// Here if no WU-level error.
343
355
// Generate new results if needed.
344
356
// NOTE: n must be signed
348
360
char value_buf[MAX_QUERY_LEN];
350
362
log_messages.printf(
351
SCHED_MSG_LOG::MSG_NORMAL,
352
364
"[WU#%d %s] Generating %d more results (%d target - %d unsent - %d in progress - %d success)\n",
353
365
wu_item.id, wu_item.name, n, wu_item.target_nresults, nunsent, ninprogress, nsuccess
357
369
char rtfpath[256];
358
370
sprintf(rtfpath, "../%s", wu_item.result_template_file);
359
371
int priority_increase = 0;
360
if ( nover && config.reliable_priority_on_over ) {
361
priority_increase = priority_increase + config.reliable_priority_on_over;
372
if (nover && config.reliable_priority_on_over) {
373
priority_increase += config.reliable_priority_on_over;
362
374
} else if (nover && !nerrors && config.reliable_priority_on_over_except_error) {
363
priority_increase = priority_increase + config.reliable_priority_on_over_except_error;
375
priority_increase += config.reliable_priority_on_over_except_error;
366
retval = create_result(
377
retval = create_result_ti(
367
378
wu_item, rtfpath, suffix, key, config, value_buf, priority_increase
371
SCHED_MSG_LOG::MSG_CRITICAL,
372
"[WU#%d %s] create_result() %d\n",
381
log_messages.printf(MSG_CRITICAL,
382
"[WU#%d %s] create_result_ti() %d\n",
373
383
wu_item.id, wu_item.name, retval
381
391
values += value_buf;
384
retval = create_result(
385
wu_item, rtfpath, suffix, key, config, 0, priority_increase
389
SCHED_MSG_LOG::MSG_CRITICAL,
390
"[WU#%d %s] create_result() %d\n",
391
wu_item.id, wu_item.name, retval
399
395
retval = r.insert_batch(values);
402
SCHED_MSG_LOG::MSG_CRITICAL,
397
log_messages.printf(MSG_CRITICAL,
403
398
"[WU#%d %s] insert_batch() %d\n",
404
399
wu_item.id, wu_item.name, retval
413
407
// - see if all over and validated
415
409
all_over_and_validated = true;
410
bool all_over_and_ready_to_assimilate = true; // used for the defer assmilation
416
411
int most_recently_returned = 0;
417
412
for (i=0; i<items.size(); i++) {
418
413
TRANSITIONER_ITEM& res_item = items[i];
424
419
if (res_item.res_outcome == RESULT_OUTCOME_SUCCESS) {
425
420
if (res_item.res_validate_state == VALIDATE_STATE_INIT) {
426
421
all_over_and_validated = false;
422
all_over_and_ready_to_assimilate = false;
428
424
} else if ( res_item.res_outcome == RESULT_OUTCOME_NO_REPLY ) {
429
425
if ( ( res_item.res_report_deadline + config.grace_period_hours*60*60 ) > now ) {
434
430
all_over_and_validated = false;
431
all_over_and_ready_to_assimilate = false;
436
// If we are defering assimilation until all results are over
437
// and validated then when that happens we need to make sure
438
// that it gets advanced to assimilate ready
439
// the items.size is a kludge
441
if (all_over_and_ready_to_assimilate == true && wu_item.assimilate_state == ASSIMILATE_INIT && items.size() > 0 && wu_item.canonical_resultid > 0
443
wu_item.assimilate_state = ASSIMILATE_READY;
444
log_messages.printf(MSG_NORMAL,
445
"[WU#%d %s] Deferred assimililation now set to ASSIMILATE_STATE_READY\n",
446
wu_item.id, wu_item.name
439
449
// if WU is assimilated, trigger file deletion
441
451
if (wu_item.assimilate_state == ASSIMILATE_DONE && ((most_recently_returned + config.delete_delay_hours*60*60) < now)) {
444
454
if (all_over_and_validated && wu_item.file_delete_state == FILE_DELETE_INIT) {
445
455
wu_item.file_delete_state = FILE_DELETE_READY;
447
SCHED_MSG_LOG::MSG_DEBUG,
456
log_messages.printf(MSG_DEBUG,
448
457
"[WU#%d %s] ASSIMILATE_DONE: file_delete_state:=>READY\n",
449
458
wu_item.id, wu_item.name
476
485
if (do_delete && res_item.res_file_delete_state == FILE_DELETE_INIT) {
478
SCHED_MSG_LOG::MSG_NORMAL,
486
log_messages.printf(MSG_NORMAL,
479
487
"[WU#%d %s] [RESULT#%d %s] file_delete_state:=>READY\n",
480
488
wu_item.id, wu_item.name, res_item.res_id, res_item.res_name
484
492
retval = transitioner.update_result(res_item);
487
SCHED_MSG_LOG::MSG_CRITICAL,
494
log_messages.printf(MSG_CRITICAL,
488
495
"[WU#%d %s] [RESULT#%d %s] result.update() == %d\n",
489
496
wu_item.id, wu_item.name, res_item.res_id, res_item.res_name, retval
495
502
} else if ( wu_item.assimilate_state == ASSIMILATE_DONE ) {
496
log_messages.printf(SCHED_MSG_LOG::MSG_DEBUG, "[WU#%d %s] not checking for items to be ready for delete because the deferred delete time has not expired. That will occur in %d seconds\n", wu_item.id, wu_item.name, most_recently_returned + config.delete_delay_hours*60*60-now);
503
log_messages.printf(MSG_DEBUG,
504
"[WU#%d %s] not checking for items to be ready for delete because the deferred delete time has not expired. That will occur in %d seconds\n",
507
most_recently_returned + config.delete_delay_hours*60*60-(int)now
499
511
// compute next transition time = minimum timeout of in-progress results
550
// If either of the grace period or delete delay is less then the next transition time then use that value
562
// If either of the grace period or delete delay is less than
563
// the next transition time then use that value
551
565
if ( max_grace_or_delay_time < wu_item.transition_time && max_grace_or_delay_time > now && ninprogress == 0) {
552
566
wu_item.transition_time = max_grace_or_delay_time;
553
log_messages.printf(SCHED_MSG_LOG::MSG_NORMAL,"[WU#%d %s] Delaying transition due to grace period or delete day. New transition time = %d sec\n",wu_item.id, wu_item.name, wu_item.transition_time);
567
log_messages.printf(MSG_NORMAL,
568
"[WU#%d %s] Delaying transition due to grace period or delete day. New transition time = %d sec\n",
569
wu_item.id, wu_item.name, wu_item.transition_time
556
573
// If transition time is in the past,
557
574
// the system is bogged down and behind schedule.
558
// Delay processing of the WU by an amount DOUBLE the amount
559
// we are behind, but not less than 60 secs or more than
575
// Delay processing of the WU by an amount DOUBLE the amount we are behind,
576
// but not less than 60 secs or more than one day.
561
578
if (wu_item.transition_time < now) {
562
579
int extra_delay = 2*(now - wu_item.transition_time);
563
580
if (extra_delay < 60) extra_delay = 60;
564
581
if (extra_delay > 86400) extra_delay = 86400;
566
SCHED_MSG_LOG::MSG_DEBUG,
582
log_messages.printf(MSG_DEBUG,
567
583
"[WU#%d %s] transition time in past: adding extra delay %d sec\n",
568
584
wu_item.id, wu_item.name, extra_delay
570
586
wu_item.transition_time = now + extra_delay;
574
SCHED_MSG_LOG::MSG_DEBUG,
589
log_messages.printf(MSG_DEBUG,
575
590
"[WU#%d %s] setting transition_time to %d\n",
576
591
wu_item.id, wu_item.name, wu_item.transition_time
579
594
retval = transitioner.update_workunit(wu_item, wu_item_original);
582
SCHED_MSG_LOG::MSG_CRITICAL,
596
log_messages.printf(MSG_CRITICAL,
583
597
"[WU#%d %s] workunit.update() == %d\n",
584
598
wu_item.id, wu_item.name, retval
606
620
retval = handle_wu(transitioner, items);
609
SCHED_MSG_LOG::MSG_CRITICAL,
622
log_messages.printf(MSG_CRITICAL,
610
623
"[WU#%d %s] handle_wu: %d; quitting\n",
611
624
wu_item.id, wu_item.name, retval
624
637
retval = boinc_db.open(config.db_name, config.db_host, config.db_user, config.db_passwd);
626
log_messages.printf(SCHED_MSG_LOG::MSG_CRITICAL, "boinc_db.open: %d\n", retval);
639
log_messages.printf(MSG_CRITICAL, "boinc_db.open: %d\n", retval);
644
log_messages.printf(MSG_DEBUG, "doing a pass\n");
631
645
if (!do_pass()) {
632
646
if (one_pass) break;
647
log_messages.printf(MSG_DEBUG, "sleeping %d\n", SLEEP_INTERVAL);
633
648
sleep(SLEEP_INTERVAL);
656
671
retval = config.parse_file("..");
658
log_messages.printf(SCHED_MSG_LOG::MSG_CRITICAL, "can't read config file\n");
673
log_messages.printf(MSG_CRITICAL, "can't read config file\n");
662
677
sprintf(path, "%s/upload_private", config.key_dir);
663
678
retval = read_key_file(path, key);
665
log_messages.printf(SCHED_MSG_LOG::MSG_CRITICAL, "can't read key\n");
680
log_messages.printf(MSG_CRITICAL, "can't read key\n");
669
log_messages.printf(SCHED_MSG_LOG::MSG_NORMAL, "Starting\n");
684
log_messages.printf(MSG_NORMAL, "Starting\n");
671
686
install_stop_signal_handler();
676
const char *BOINC_RCSID_be98c91511 = "$Id: transitioner.C 12773 2007-05-29 23:41:31Z boincadm $";
691
const char *BOINC_RCSID_be98c91511 = "$Id: transitioner.C 14908 2008-03-13 23:35:13Z davea $";