38
int ffindex_apply_by_entry(char *data, ffindex_index_t* index, ffindex_entry_t* entry, char* program_name, char** program_argv, FILE* data_file_out, FILE* index_file_out, size_t *offset)
38
int ffindex_apply_by_entry(char *data, ffindex_index_t* index, ffindex_entry_t* entry, char* program_name, char** program_argv, FILE* data_file_out, FILE* index_file_out, size_t *offset, FILE* log_file)
41
41
int capture_stdout = (data_file_out != NULL);
149
149
waitpid(child_pid, &status, 0);
150
fprintf(stderr, "%s\t%ld\t%ld\t%d\n", entry->name, entry->offset, entry->length, WEXITSTATUS(status));
151
fprintf(log_file, "%s\t%ld\t%ld\t%d\n", entry->name, entry->offset, entry->length, WEXITSTATUS(status));
167
mpi_error = MPI_Init(&argn, &argv);
168
mpi_error = MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank);
169
mpi_error = MPI_Comm_size(MPI_COMM_WORLD, &mpi_num_procs);
168
mpi_error = MPI_Init(&argn, &argv); if(mpi_error) goto EXCEPTION;
169
mpi_error = MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank); if(mpi_error) goto EXCEPTION;
170
mpi_error = MPI_Comm_size(MPI_COMM_WORLD, &mpi_num_procs); if(mpi_error) goto EXCEPTION;
171
172
int opt, merge = 1;
172
173
char *data_filename_out = NULL,
173
*index_filename_out = NULL;
174
*index_filename_out = NULL,
175
*log_filename = NULL;
175
while ((opt = getopt(argn, argv, "d:i:m")) != -1)
177
while ((opt = getopt(argn, argv, "d:i:mql:")) != -1)
182
log_filename = optarg;
191
196
if(argn - optind < 3)
193
fprintf(stderr, "Not enough arguments %d.\n", optind - argn);
194
fprintf(stderr, "USAGE: %s [-m] -d DATA_FILENAME_OUT -i INDEX_FILENAME_OUT DATA_FILENAME INDEX_FILENAME -- PROGRAM [PROGRAM_ARGS]*\n"
198
fprintf(stderr, "ERROR Not enough arguments.\n\n");
200
fprintf(stderr, "USAGE: %s [-m] [-q] [-l LOG_FILE] -d DATA_FILENAME_OUT -i INDEX_FILENAME_OUT DATA_FILENAME INDEX_FILENAME -- PROGRAM [PROGRAM_ARGS]*\n"
201
"\t-l\tLOG_FILE like an FFindex file but with the return value of apply added.\n"
202
"\t\tThis file can be processed and used as FFindex file.\n"
203
"\t\tE.g. grep \"\t1$\" foo.log | cut -f 1-3 > foo-special.ffindex.\n"
204
"\t\twhere maybe the ffindex_apply_mpi ran with -- perl -e 'return 1 if(/^SPECIAL/)'\n"
195
205
"\t-m\tDo not merge the FFindex parts generated by the different MPI processes\n"
196
206
"\t\tThis is useful for large MPI Jobs where merge time might be accounted.\n"
197
"\nDesigned and implemented by Andy Hauser <hauser@genzentrum.lmu.de>.\n",
207
"\t-d\tFFDATA output file, stdout of program will be captured\n"
208
"\t-i\tFFINDEX output file, entries get the same name as input entry\n"
201
214
read_buffer = malloc(400 * 1024 * 1024);
202
215
char *data_filename = argv[optind++];
210
223
if( data_file == NULL) { fferror_print(__FILE__, __LINE__, argv[0], data_filename); exit(EXIT_FAILURE); }
211
224
if(index_file == NULL) { fferror_print(__FILE__, __LINE__, argv[0], index_filename); exit(EXIT_FAILURE); }
213
FILE *data_file_out = NULL, *index_file_out = NULL;
214
226
// Setup one output FFindex for each MPI process
227
FILE *data_file_out = NULL,
228
*index_file_out = NULL;
215
229
if(data_filename_out != NULL && index_filename_out != NULL)
217
231
char* data_filename_out_rank = malloc(FILENAME_MAX);
218
232
char* index_filename_out_rank = malloc(FILENAME_MAX);
219
snprintf( data_filename_out_rank, FILENAME_MAX, "%s.%d", data_filename_out, mpi_rank);
233
snprintf( data_filename_out_rank, FILENAME_MAX, "%s.%d", data_filename_out, mpi_rank);
220
234
snprintf(index_filename_out_rank, FILENAME_MAX, "%s.%d", index_filename_out, mpi_rank);
221
235
data_file_out = fopen(data_filename_out_rank, "w+");
222
236
index_file_out = fopen(index_filename_out_rank, "w+");
224
237
if( data_file_out == NULL) { fferror_print(__FILE__, __LINE__, argv[0], data_filename_out); exit(EXIT_FAILURE); }
225
if(index_file_out == NULL) { fferror_print(__FILE__, __LINE__, argv[0], index_filename_out); exit(EXIT_FAILURE); }
238
if(index_file_out == NULL) { fferror_print(__FILE__, __LINE__, argv[0], index_filename_out); exit(EXIT_FAILURE); }
241
FILE *log_file = NULL;
245
char* log_filename_rank = malloc(FILENAME_MAX);
246
snprintf(log_filename_rank, FILENAME_MAX, "%s.%d", index_filename_out, mpi_rank);
247
log_file = fopen(log_filename_rank, "w+");
248
if( log_file == NULL) { fferror_print(__FILE__, __LINE__, argv[0], log_filename_rank); exit(EXIT_FAILURE); }
228
251
int capture_stdout = (data_file_out != NULL);
261
284
for(size_t entry_index = range_start; entry_index < range_end; entry_index++)
263
286
ffindex_entry_t* entry = ffindex_get_entry_by_index(index, entry_index);
264
if(entry == NULL) { perror(entry->name); return errno; }
265
int error = ffindex_apply_by_entry(data, index, entry, program_name, program_argv, data_file_out, index_file_out, &offset);
287
if(entry == NULL) { perror(entry->name); goto EXCEPTION; }
288
int error = ffindex_apply_by_entry(data, index, entry, program_name, program_argv, data_file_out, index_file_out, &offset, log_file);
267
290
{ perror(entry->name); break; }
273
296
ffindex_entry_t* entry = ffindex_get_entry_by_index(index, left_over_entry_index);
274
297
if(entry == NULL) { perror(entry->name); return errno; }
275
298
//fprintf(stderr, "handling left over: %ld\n", left_over_entry_index);
276
int error = ffindex_apply_by_entry(data, index, entry, program_name, program_argv, data_file_out, index_file_out, &offset);
299
int error = ffindex_apply_by_entry(data, index, entry, program_name, program_argv, data_file_out, index_file_out, &offset, log_file);
278
301
perror(entry->name);
289
312
// merge FFindexes in master
290
if(merge && data_filename_out != NULL && mpi_rank == 0)
313
if(mpi_rank == 0 && merge && data_filename_out != NULL)
292
315
char* merge_command = malloc(FILENAME_MAX * 5);
293
316
for(int i = 0; i < mpi_num_procs; i++)
302
325
snprintf(merge_command, FILENAME_MAX, "%s.%d", index_filename_out, i);
303
326
unlink(merge_command);
330
fprintf(stderr, "merge command failed: %s\n", merge_command);
310
338
return EXIT_SUCCESS;
344
fprintf(stderr, "\nEXCEPTION ffindex_apply_mpi in rank: %d\n", mpi_rank);
313
351
/* vim: ts=2 sw=2 et