3
Copyright (C) 2009- The University of Notre Dame
4
This software is distributed under the GNU General Public License.
5
See the file COPYING for details.
19
#include "work_queue.h"
20
#include "hash_table.h"
21
#include "stringtools.h"
26
#include "compressed_sequence.h"
28
static struct work_queue *queue = 0;
29
static struct hash_table *sequence_table = 0;
30
static int port = WORK_QUEUE_DEFAULT_PORT;
31
static char align_prog[1024];
32
static const char *align_prog_args = "";
33
static const char *candidate_file_name;
34
static const char *sequence_file_name;
35
static const char *output_file_name;
37
static FILE *sequence_file;
38
static FILE *candidate_file;
39
static FILE *output_file;
41
static time_t start_time = 0;
42
static time_t last_display_time = 0;
44
static int more_candidates = 1;
45
static int tasks_submitted = 0;
46
static int tasks_done = 0;
47
static timestamp_t tasks_runtime = 0;
48
static timestamp_t tasks_filetime = 0;
49
static int candidates_loaded = 0;
50
static int sequences_loaded = 0;
52
static int max_pairs_per_task = 10000;
54
#define CANDIDATE_SUCCESS 0
55
#define CANDIDATE_EOF 1
56
#define CANDIDATE_WAIT 2
58
#define CAND_FILE_LINE_MAX 4096
60
#define unsigned_isspace(c) isspace((unsigned char) c)
62
static void show_version(const char *cmd)
64
printf("%s version %d.%d.%d built by %s@%s on %s at %s\n", cmd, CCTOOLS_VERSION_MAJOR, CCTOOLS_VERSION_MINOR, CCTOOLS_VERSION_MICRO, BUILD_USER, BUILD_HOST, __DATE__, __TIME__);
67
static void show_help(const char *cmd)
69
printf("Use: %s [options] <sand_align_kernel> <candidates.cand> <sequences.cfa> <overlaps.ovl>\n", cmd);
70
printf("where options are:\n");
71
printf(" -p <port> Port number for work queue master to listen on. (default: %d)\n",port);
72
printf(" -n <number> Maximum number of candidates per task. (default is %d)\n",max_pairs_per_task);
73
printf(" -e <args> Extra arguments to pass to the alignment program.\n");
74
printf(" -d <subsystem> Enable debugging for this subsystem. (Try -d all to start.)\n");
75
printf(" -F <#> Work Queue fast abort multiplier. (default is 10.)\n");
76
printf(" -o <file> Send debugging to this file.\n");
77
printf(" -v Show version string\n");
78
printf(" -h Show this help screen\n");
81
static void display_progress(struct work_queue *q)
83
struct work_queue_stats info;
84
static int row_count = 0;
87
work_queue_get_stats(q, &info);
90
printf(" Total | Workers | Tasks Avg | K-Cand K-Seqs | Total\n");
91
printf(" Time | Idle Busy | Submit Idle Run Done Time | Loaded Loaded | Speedup\n");
92
row_count = row_limit;
95
printf("%6d | %4d %4d | %6d %4d %4d %6d %6.2lf | %6d %6d | %5.2lf\n",
96
(int) (time(0) - start_time),
97
info.workers_init + info.workers_ready,
103
tasks_done ? tasks_runtime / (double) tasks_done / 1000000.0 : 0,
104
candidates_loaded / 1000,
105
sequences_loaded / 1000,
106
(time(0)>start_time) ? (tasks_runtime/1000000.0) / (time(0)-start_time) : 0);
110
last_display_time = time(0);
115
Check to see that the output consists of an envelope [ ... ]
116
around some OVL records. If there are no good matches in the
117
output, we should see an envelope with nothing in it.
118
If the file is completely empty, then there is a problem and
119
we reject the output.
122
static char * confirm_output( char *output )
127
while(unsigned_isspace(*s)) s++;
130
debug(D_NOTICE,"aligment output did not begin with [:\n%s\n",output);
136
while(unsigned_isspace(*s)) s++;
144
while(unsigned_isspace(*s)) s--;
147
debug(D_NOTICE,"aligment output did not end with ]:\n%s\n",output);
156
static void task_complete( struct work_queue_task *t )
158
if(t->return_status!=0) {
159
debug(D_NOTICE,"task failed with status %d on host %s\n",t->return_status,t->host);
160
work_queue_submit(queue,t);
164
char *clean_output = confirm_output(t->output);
166
work_queue_submit(queue,t);
170
fprintf(output_file,"%s",clean_output);
174
tasks_runtime += (t->finish_time - t->start_time);
175
tasks_filetime += t->total_transfer_time;
177
work_queue_task_delete(t);
180
static int candidate_read(FILE * fp, char *name1, char *name2, char *extra_data)
182
char line[CAND_FILE_LINE_MAX];
186
long start_of_line = ftell(fp);
188
if(!fgets(line, CAND_FILE_LINE_MAX, fp)) return CANDIDATE_WAIT;
190
if(line[strlen(line)-1]!='\n') {
191
fseek(fp,start_of_line,SEEK_SET);
192
return CANDIDATE_WAIT;
195
if(!strcmp(line,"EOF\n")) {
197
return CANDIDATE_EOF;
200
int n = sscanf(line, "%s %s %[^\n]", name1, name2, extra_data);
201
if(n!=3) fatal("candidate file is corrupted: %s\n",line);
205
return CANDIDATE_SUCCESS;
208
struct cseq * sequence_lookup( struct hash_table *h, const char *name )
210
struct cseq *c = hash_table_lookup(h,name);
214
c = cseq_read(sequence_file);
219
hash_table_insert(h,c->name,c);
220
if(!strcmp(name,c->name)) return c;
222
int size = hash_table_size(h);
223
if(size%100000 ==0 )debug(D_DEBUG,"loaded %d sequences",size);
226
fatal("candidate file contains invalid sequence name: %s\n",name);
230
static void buffer_ensure( char **buffer, int *buffer_size, int buffer_used, int buffer_delta )
232
int buffer_needed = buffer_used + buffer_delta;
234
if(buffer_needed>*buffer_size) {
237
} while( buffer_needed > *buffer_size );
239
*buffer = realloc(*buffer,*buffer_size);
243
static struct work_queue_task * task_create( struct hash_table *sequence_table )
245
char aname1[CAND_FILE_LINE_MAX];
246
char aname2[CAND_FILE_LINE_MAX];
247
char aextra[CAND_FILE_LINE_MAX];
249
char bname1[CAND_FILE_LINE_MAX];
250
char bname2[CAND_FILE_LINE_MAX];
251
char bextra[CAND_FILE_LINE_MAX];
253
struct cseq *s1, *s2;
255
int result = candidate_read(candidate_file,aname1,aname2,aextra);
256
if(result!=CANDIDATE_SUCCESS) return 0;
258
s1 = sequence_lookup(sequence_table,aname1);
259
s2 = sequence_lookup(sequence_table,aname2);
261
static int buffer_size = 1024;
262
char *buffer = malloc(buffer_size);
265
buffer_ensure(&buffer,&buffer_size,buffer_pos,cseq_size(s1)+cseq_size(s2)+10);
267
buffer_pos += cseq_sprint(&buffer[buffer_pos],s1,"");
268
buffer_pos += cseq_sprint(&buffer[buffer_pos],s2,aextra);
274
result = candidate_read(candidate_file,bname1,bname2,bextra);
275
if(result!=CANDIDATE_SUCCESS) break;
277
s1 = sequence_lookup(sequence_table,bname1);
278
s2 = sequence_lookup(sequence_table,bname2);
280
if(strcmp(aname1,bname1)!=0) {
281
buffer_ensure(&buffer,&buffer_size,buffer_pos,cseq_size(s1)+cseq_size(s2)+10);
282
buffer_pos += cseq_sprint(&buffer[buffer_pos],0,"");
283
buffer_pos += cseq_sprint(&buffer[buffer_pos],s1,"");
284
strcpy(aname1,bname1);
285
strcpy(aname2,bname2);
286
strcpy(aextra,bextra);
290
buffer_ensure(&buffer,&buffer_size,buffer_pos,cseq_size(s2)+10);
291
buffer_pos += cseq_sprint(&buffer[buffer_pos],s2,bextra);
296
} while( npairs < max_pairs_per_task );
298
debug(D_DEBUG,"created task of %d sequences and %d comparisons\n",nseqs,npairs);
300
char cmd[strlen(align_prog)+strlen(align_prog_args)+100];
302
sprintf(cmd, "./%s %s aligndata", "align", align_prog_args);
304
struct work_queue_task *t = work_queue_task_create(cmd);
305
work_queue_task_specify_input_file(t, align_prog, "align");
306
work_queue_task_specify_input_buf(t, buffer, buffer_pos, "aligndata");
313
int main(int argc, char *argv[])
317
const char *progname = "sand_align_master";
319
debug_config(progname);
321
// By default, turn on fast abort option since we know each job is of very similar size (in terms of runtime).
322
// One can also set the fast_abort_multiplier by the '-f' option.
323
wq_option_fast_abort_multiplier = 10;
325
while((c = getopt(argc, argv, "e:F:p:n:d:o:vh")) != (char) -1) {
331
max_pairs_per_task = atoi(optarg);
334
align_prog_args = strdup(optarg);
337
debug_flags_set(optarg);
340
wq_option_fast_abort_multiplier = atof(optarg);
343
debug_config_file(optarg);
346
show_version(progname);
357
if((argc - optind) != 4) {
362
if(!find_executable(argv[optind],"PATH",align_prog,sizeof(align_prog))) {
363
fprintf(stderr, "%s: couldn't find alignment program %s: is it in your path?\n",progname,argv[optind]);
368
candidate_file_name = argv[optind + 1];
369
sequence_file_name = argv[optind + 2];
370
output_file_name = argv[optind + 3];
372
sequence_file = fopen(sequence_file_name,"r");
374
fprintf(stderr, "%s: couldn't open sequence file %s: %s\n", progname, sequence_file_name, strerror(errno));
378
candidate_file = fopen(candidate_file_name,"r");
379
if(!candidate_file) {
380
fprintf(stderr, "%s: couldn't open candidate file %s: %s\n", progname,candidate_file_name, strerror(errno));
384
output_file = fopen(output_file_name, "a");
386
fprintf(stderr, "%s: couldn't open output file %s: %s\n", progname,output_file_name, strerror(errno));
390
queue = work_queue_create(port);
392
fprintf(stderr, "%s: couldn't listen on port %d: %s\n",progname,port,strerror(errno));
396
sequence_table = hash_table_create(20000001,0);
398
start_time = time(0);
400
struct work_queue_task *t;
402
while( more_candidates || !work_queue_empty(queue) ) {
404
if(last_display_time < time(0))
405
display_progress(queue);
407
while( more_candidates && work_queue_hungry(queue) ) {
408
t = task_create( sequence_table );
410
work_queue_submit(queue,t);
417
if(work_queue_empty(queue)) {
418
if(more_candidates) sleep(5);
420
if(work_queue_hungry(queue)) {
421
t = work_queue_wait(queue,0);
423
t = work_queue_wait(queue,5);
425
if(t) task_complete(t);
429
display_progress(queue);
431
printf("Completed %i tasks in %i seconds\n", tasks_done, (int) (time(0) - start_time));
434
fclose(candidate_file);
436
work_queue_delete(queue);