2
2
* Sun Grid Engine Scheduler Event Generator implementation for GT4.
4
4
* See CREDITS file for attributions.
5
5
* See LICENSE file for license terms.
8
/* This #define is needed for the correct operation of the GLIBC strptime
8
/* This #define is needed for the correct operation of the GLIBC strptime
10
10
#define _XOPEN_SOURCE 1
18
18
#define SEG_SGE_DEBUG(level, message) \
19
19
GlobusDebugPrintf(SEG_SGE, level, message)
21
/* This error code is used to represent the
21
/* This error code is used to represent the
22
22
* "we want to skip a log entry" state. */
23
23
#define SEG_SGE_SKIP_LINE -10
88
88
/** simple test whether all we're looking for is the timestamp; */
89
89
globus_bool_t need_timestamp;
90
90
/** First timestamp in log-file */
91
time_t file_timestamp;
91
time_t file_timestamp;
92
92
/** file rotation number at 1st read - assumes N+1 old files labeled 0,1,2,3,4,5,6,...,N */
94
94
/** file inode for quick test of file rotation */
177
177
globus_l_sge_logfile_state_t * state);
180
/* Globus-specific module descriptor struct. This is
181
* inspected by the master globus-scheduler-event-generator process. */
183
/**** RJP 4.2 change -- comment out this declaration ***
185
globus_module_descriptor_t
186
globus_scheduler_event_module_ptr =
188
"globus_scheduler_event_generator_sge",
189
globus_l_sge_module_activate,
190
globus_l_sge_module_deactivate,
197
***************************************************/
199
180
/**** RJP 4.2 change -- replace aobve with this *****/
201
182
GlobusExtensionDefineModule(globus_seg_sge) =
212
/**************End 4.2 Change ******************/
215
/* This function will be used by the SEG calling code to
193
/**************End 4.2 Change ******************/
196
/* This function will be used by the SEG calling code to
216
197
* initialize this module. */
448
429
goto free_logfile_state_buffer_error;
451
/* Locate our logfile.
452
* Other DRMs need to know the current time to determine which
432
/* Locate our logfile.
433
* Other DRMs need to know the current time to determine which
453
434
* logfile to inspect. SGE just keeps a single large 'reporting' log. */
455
/* --- Above is true but we've implemented file rotation
456
* within the finding logfile routine rjp Jan.2008
436
/* --- Above is true but we've implemented file rotation
437
* within the finding logfile routine rjp Jan.2008
459
440
rc = globus_l_sge_find_logfile(logfile_state);
460
441
if (rc == GLOBUS_SUCCESS)
478
459
goto free_logfile_state_path_error;
481
/* Setup a callback so that our main read function will be
482
* invoked at a later time. */
483
/* rjp --> this used to include a pointer to the callback in the logfile_state struct.
484
* as &logfile_state->callback, But this causes a memory leak. Removed and put to NULL */
462
/* Setup a callback so that our main read function will be
463
* invoked at a later time.
485
465
result = globus_callback_register_oneshot(
488
468
globus_l_sge_read_callback,
586
566
globus_mutex_unlock(&globus_l_sge_mutex);
589
568
/* file may not have existed earlier rjp Jan.2008 */
590
569
if(state->fp == NULL)
592
571
if( state->path == NULL )
594
573
SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO, ("no file name available"));
599
rc = stat(state->path,&s);
578
rc = stat(state->path,&s);
602
581
SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO, ("opening file in callback"));
603
582
state->fp = fopen(state->path,"r");
604
583
state->file_inode = s.st_ino;
610
589
/* Provided that we have an open log filehandle.. */
615
594
- state->buffer_point;
617
596
SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE,
618
("Reading a maximum of %u bytes from SGE reporting file = %s\n",
597
("Reading a maximum of %u bytes from SGE reporting file = %s\n",
619
598
max_to_read, state->path));
621
600
/* Actually perform the read. */
622
rc = fread(state->buffer + state->buffer_point +
601
rc = fread(state->buffer + state->buffer_point +
623
602
state->buffer_valid, 1, max_to_read, state->fp);
625
604
SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE,
640
/* Or something bad has happened.
619
/* Or something bad has happened.
641
620
* This error state is currently unhandled... */
643
622
/* XXX: Read error */
647
/* Update our state to record that we've added more valid data
626
/* Update our state to record that we've added more valid data
648
627
* to the buffer. */
649
628
state->buffer_valid += rc;
651
/* Parse data. This function will also generate event
630
/* Parse data. This function will also generate event
652
631
* notifications and send them to the main server. */
653
632
SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE, ("Parsing events in buffer.\n"));
654
633
rc = globus_l_sge_parse_events(state);
656
635
/* Move any remaining log data to the start of the buffer,
657
636
* overwriting any old log data that we have already parsed. */
658
SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE,
637
SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE,
659
638
("Cleaning buffer of parsed events.\n"));
660
639
rc = globus_l_sge_clean_buffer(state);
665
644
if( (eof_hit == GLOBUS_TRUE) )
668
647
/* Here we hand log-rotation possibility - by resetting file_number
669
* 1. check to see if log has been rotated and
670
* 2. reset file_number so that next file opened will
648
* 1. check to see if log has been rotated and
649
* 2. reset file_number so that next file opened will
671
650
* be correctly identified rjp Jan.2008
673
652
rc = globus_l_sge_check_rotated(state);
685
664
fclose(state->fp);
686
665
state->fp = NULL;
689
/* decrement file number.
668
/* decrement file number.
690
669
* Note if file was rotated while open,
691
* the above increment of file number
670
* the above increment of file number
692
671
* allows this to work rjp Jan.2008
707
686
/* we got a new file */
708
687
eof_hit = GLOBUS_FALSE;
714
693
/* Determine if we have reached the EOF on the logfile.
715
694
* If we have, set a moderately long delay.
716
695
* If not, set zero delay so we can read the rest! */
718
if (eof_hit == GLOBUS_TRUE || state->fp == NULL)
697
if (eof_hit == GLOBUS_TRUE || state->fp == NULL)
720
699
GlobusTimeReltimeSet(delay, 2, 0);
724
703
GlobusTimeReltimeSet(delay, 0, 0);
754
733
globus_cond_signal(&globus_l_sge_cond);
739
"FATAL: Unable to register callback. SGE SEG exiting\n");
757
742
globus_mutex_unlock(&globus_l_sge_mutex);
759
744
SEG_SGE_DEBUG(SEG_SGE_DEBUG_WARN,
763
748
/* globus_l_sge_read_callback() */
766
* Determine the SGE log file name.
751
* Determine the SGE log file name.
767
752
* This is actually really easy for SGE, because the filename doesn't change --
768
* it'll always be called 'reporting' and we'll already have the
753
* it'll always be called 'reporting' and we'll already have the
769
754
* exact path to use.
771
* above is now modified for simple reporting file rotation: rjp Jan.2008
756
* above is now modified for simple reporting file rotation: rjp Jan.2008
774
759
* SGE log state structure. The path field of the structure may be
775
760
* modified by this function.
794
779
if (state->path == NULL)
796
781
SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE, ("allocating path\n"));
797
state->path = malloc(strlen(state->log_file) + 10);
782
state->path = malloc(strlen(state->log_file) + 10);
798
783
if (state->path == NULL)
800
785
rc = SEG_SGE_ERROR_OUT_OF_MEMORY;
809
794
stamp = mktime(&state->start_timestamp);
810
795
SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE, ("input timestamp = %d\n",stamp));
812
state->file_number=-1;
797
state->file_number=-1;
813
798
while(!file_found)
815
SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE,
800
SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE,
816
801
("find file loop with file_number = %d\n",
817
802
state->file_number));
830
815
rc = globus_l_sge_get_file_timestamp(state);
832
SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE,
817
SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE,
833
818
("file = %s not found\n",state->path));
834
819
if(state->file_number >= 0)
842
827
file_found = GLOBUS_TRUE;
845
/* it's possible the direct file (file_number = -1)
830
/* it's possible the direct file (file_number = -1)
846
831
doesn't exist yet so set to skip over (see next if/else) */
847
832
state->file_timestamp = 0;
851
if( state->file_timestamp > 0 && state->file_timestamp < stamp )
853
SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO,
836
if( state->file_timestamp > 0 && state->file_timestamp < stamp )
838
SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO,
854
839
("found our file = %s with Timestamp %d \n",
855
840
state->path,state->file_timestamp));
856
841
file_found=GLOBUS_TRUE;
860
SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO,
845
SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO,
861
846
("Not file to use = %s with Timestamp %d \n",
862
847
state->path,state->file_timestamp));
865
850
state->file_number++;
867
/** now it;s possible under quick file rotations that no
868
* timestamp is put in the file, thus state->file_timestamp = 0.
869
* In this case, as written above, we'll appropriately skip
870
* that file: rjp Jan.2008
852
/** now it;s possible under quick file rotations that no
853
* timestamp is put in the file, thus state->file_timestamp = 0.
854
* In this case, as written above, we'll appropriately skip
855
* that file: rjp Jan.2008
875
860
rc = stat(state->path, &s);
876
state->file_inode = s.st_ino;
861
state->file_inode = s.st_ino;
885
870
SEG_SGE_DEBUG(SEG_SGE_DEBUG_ERROR,
886
871
("file %s doesn't exist\n", state->path));
939
924
/* globus_l_sge_find_logfile() */
944
* routine to set the file name based on the file rotation model.
929
* routine to set the file name based on the file rotation model.
945
930
* Here simply all rotated files have '.file_number' extension. If other
946
931
* models are defined, change this routine accordingly
957
SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO,
942
SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO,
958
943
("globus_l_sge_set_logfile_name()\n"));
959
944
if( state->file_number < 0)
961
SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE,
946
SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE,
962
947
("non-rotated file number \n"));
963
948
rc = sprintf(state->path,"%s",state->log_file);
964
949
state->old_log = GLOBUS_FALSE;
968
SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE,
953
SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE,
969
954
("rotated file file_number >= 0\n"));
970
955
rc = sprintf(state->path,"%s%s%d",state->log_file,".",state->file_number);
971
956
state->old_log = GLOBUS_TRUE;
976
961
/* globus_l_sge_set_logfile_name */
979
* Move any data in the state buffer to the beginning, to enable reusing
964
* Move any data in the state buffer to the beginning, to enable reusing
980
965
* buffer space which has already been parsed.
1061
* Simple routine to check inode number to see if it has changed.
1062
* If so we assume file has been rotated
1046
* Simple routine to check inode number to see if it has changed.
1047
* If so we assume file has been rotated
1089
1074
/* This function's job is to parse any whole events from our read buffer,
1090
* generate state update messages and deliver them to the main process.
1075
* generate state update messages and deliver them to the main process.
1092
1077
* It's now also used to grab the 1st timestamped entry in the reporting file
1093
1078
* when file rotation is activated . rjp Jan.2008
1116
1101
/* Find the next newline */
1117
while ( (status != SEG_SGE_FOUND_FILE_TIMESTAMP) &&
1102
while ( (status != SEG_SGE_FOUND_FILE_TIMESTAMP) &&
1118
1103
(eol = memchr(state->buffer + state->buffer_point,
1120
1105
state->buffer_valid)) != NULL)
1125
1110
SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE,
1126
1111
("parsing line %s\n", state->buffer + state->buffer_point));
1128
rc = globus_l_sge_split_into_fields(state, &fields, &nfields);
1113
rc = globus_l_sge_split_into_fields(state, &fields, &nfields);
1130
1115
/* If split_into_fields fails, ignore the line.*/
1131
1116
if (rc != GLOBUS_SUCCESS)
1133
1118
SEG_SGE_DEBUG(SEG_SGE_DEBUG_WARN,
1134
("Failed to parse line %s\n",
1119
("Failed to parse line %s\n",
1135
1120
state->buffer + state->buffer_point));
1136
1121
goto free_fields;
1139
1124
/* If the first character is a '#', ignore the line. */
1140
1125
if (strstr(fields[0], "#") == fields[0]) {
1141
1126
SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE,
1142
("Line '%s' is a comment, skipping.\n",
1127
("Line '%s' is a comment, skipping.\n",
1143
1128
state->buffer + state->buffer_point));
1144
1129
goto free_fields;
1201
1186
/* Batch accounting: resources consumed by the job */
1202
if (strstr(fields[1], "acct") == fields[1])
1187
if (strstr(fields[1], "acct") == fields[1])
1206
/* From the SGE 'reporting' man page:
1191
/* From the SGE 'reporting' man page:
1209
1194
* Indicates the problem which occurred in case a job could not be
1210
1195
* started on the execution host (e.g. because the owner of the job
1211
1196
* did not have a valid account on that machine). If Grid Engine
1229
1214
if ( failed != 0)
1231
1216
SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO,
1232
("New event: job %s has failed with exit status %d.\n",
1217
("New event: job %s has failed with exit status %d.\n",
1233
1218
job_id, exit_status));
1234
1219
rc = globus_scheduler_event_failed(stamp, job_id, failed);
1238
1223
SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO,
1239
("New event: job %s has done with exit status %d.\n",
1224
("New event: job %s has done with exit status %d.\n",
1240
1225
job_id, exit_status));
1241
1226
rc = globus_scheduler_event_done(stamp, job_id, exit_status);
1245
1230
else if (strstr(fields[1], "job_log") == fields[1])
1249
1234
/* Job state change. */
1322
1307
fclose(state->fp);
1323
1308
state->fp = NULL;
1326
1311
state->fp = fopen(state->path,"r");
1328
1313
if(state->fp == NULL)
1330
SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO,
1315
SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO,
1331
1316
(" unable to open file name = %s\n",state->path));
1335
1320
/* start with an empty buffer */
1336
1321
state->buffer_point = 0;
1337
1322
state->buffer_valid = 0;
1338
state->need_timestamp = GLOBUS_TRUE;
1323
state->need_timestamp = GLOBUS_TRUE;
1339
1324
state->file_timestamp = 0;
1341
while ( state->file_timestamp == 0 && !eof_hit )
1326
while ( state->file_timestamp == 0 && !eof_hit )
1343
1328
/* Calculate how much data will fit within the read-buffer. */
1344
1329
max_to_read = state->buffer_length - state->buffer_valid
1345
1330
- state->buffer_point;
1347
1332
SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE,
1348
("Reading a maximum of %u bytes from SGE reporting file\n",
1333
("Reading a maximum of %u bytes from SGE reporting file\n",
1351
1336
/* Actually perform the read. */
1352
rc = fread(state->buffer + state->buffer_point +
1337
rc = fread(state->buffer + state->buffer_point +
1353
1338
state->buffer_valid, 1, max_to_read, state->fp);
1355
1340
SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE,
1366
1351
/* try to find the file timestamp inside the buffer */
1367
1352
rc = globus_l_sge_parse_events(state);
1369
SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE,
1354
SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE,
1370
1355
(" Cleaning buffer of parsed events.\n"));
1371
1356
rc = globus_l_sge_clean_buffer(state);
1381
1366
/* End with an empty buffer */
1382
1367
state->buffer_point = 0;
1383
1368
state->buffer_valid = 0;
1384
state->need_timestamp = GLOBUS_FALSE;
1369
state->need_timestamp = GLOBUS_FALSE;
1386
1371
if(state->file_timestamp == 0 )
1388
SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE,
1373
SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE,
1389
1374
(" Could not get timestamp from file "));
1393
SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO,
1378
SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO,
1394
1379
("globus_l_sge_get_file_timestamp() exit.\n"));
1398
SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE,
1383
SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE,
1399
1384
("Get Timestamp Problem opening file %s\n",state->path));
1407
1392
* Log state structure. The string pointed to by
1408
* state-\>buffer + state-\>buffer_point is modified
1393
* state-\>buffer + state-\>buffer_point is modified
1409
1394
* @param fields
1410
1395
* Modified to point to a newly allocated array of char * pointers which
1411
1396
* point to the start of each field within the state buffer block.