~ubuntu-branches/ubuntu/vivid/cctools/vivid

« back to all changes in this revision

Viewing changes to sand/src/sand_align_master.c

  • Committer: Bazaar Package Importer
  • Author(s): Michael Hanke
  • Date: 2011-05-07 09:05:00 UTC
  • Revision ID: james.westby@ubuntu.com-20110507090500-lqpmdtwndor6e7os
Tags: upstream-3.3.2
ImportĀ upstreamĀ versionĀ 3.3.2

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
 
 
2
/*
 
3
Copyright (C) 2009- The University of Notre Dame
 
4
This software is distributed under the GNU General Public License.
 
5
See the file COPYING for details.
 
6
*/
 
7
 
 
8
#include <stdio.h>
 
9
#include <unistd.h>
 
10
#include <stdlib.h>
 
11
#include <string.h>
 
12
#include <errno.h>
 
13
#include <string.h>
 
14
#include <time.h>
 
15
#include <ctype.h>
 
16
#include <sys/stat.h>
 
17
 
 
18
#include "debug.h"
 
19
#include "work_queue.h"
 
20
#include "hash_table.h"
 
21
#include "stringtools.h"
 
22
#include "macros.h"
 
23
#include "envtools.h"
 
24
 
 
25
#include "sequence.h"
 
26
#include "compressed_sequence.h"
 
27
 
 
28
static struct work_queue *queue = 0;
 
29
static struct hash_table *sequence_table = 0;
 
30
static int port = WORK_QUEUE_DEFAULT_PORT;
 
31
static char align_prog[1024];
 
32
static const char *align_prog_args = "";
 
33
static const char *candidate_file_name;
 
34
static const char *sequence_file_name;
 
35
static const char *output_file_name;
 
36
 
 
37
static FILE *sequence_file;
 
38
static FILE *candidate_file;
 
39
static FILE *output_file;
 
40
 
 
41
static time_t start_time = 0;
 
42
static time_t last_display_time = 0;
 
43
 
 
44
static int more_candidates = 1;
 
45
static int tasks_submitted = 0;
 
46
static int tasks_done = 0;
 
47
static timestamp_t tasks_runtime = 0;
 
48
static timestamp_t tasks_filetime = 0;
 
49
static int candidates_loaded = 0;
 
50
static int sequences_loaded = 0;
 
51
 
 
52
static int max_pairs_per_task = 10000;
 
53
 
 
54
#define CANDIDATE_SUCCESS 0
 
55
#define CANDIDATE_EOF 1
 
56
#define CANDIDATE_WAIT 2
 
57
 
 
58
#define CAND_FILE_LINE_MAX 4096
 
59
 
 
60
#define unsigned_isspace(c) isspace((unsigned char) c)
 
61
 
 
62
static void show_version(const char *cmd)
 
63
{
 
64
        printf("%s version %d.%d.%d built by %s@%s on %s at %s\n", cmd, CCTOOLS_VERSION_MAJOR, CCTOOLS_VERSION_MINOR, CCTOOLS_VERSION_MICRO, BUILD_USER, BUILD_HOST, __DATE__, __TIME__);
 
65
}
 
66
 
 
67
static void show_help(const char *cmd)
 
68
{
 
69
        printf("Use: %s [options] <sand_align_kernel> <candidates.cand> <sequences.cfa> <overlaps.ovl>\n", cmd);
 
70
        printf("where options are:\n");
 
71
        printf(" -p <port>      Port number for work queue master to listen on. (default: %d)\n",port);
 
72
        printf(" -n <number>    Maximum number of candidates per task. (default is %d)\n",max_pairs_per_task);
 
73
        printf(" -e <args>      Extra arguments to pass to the alignment program.\n");
 
74
        printf(" -d <subsystem> Enable debugging for this subsystem.  (Try -d all to start.)\n");
 
75
        printf(" -F <#>         Work Queue fast abort multiplier.     (default is 10.)\n");
 
76
        printf(" -o <file>      Send debugging to this file.\n");
 
77
        printf(" -v             Show version string\n");
 
78
        printf(" -h             Show this help screen\n");
 
79
}
 
80
 
 
81
static void display_progress(struct work_queue *q)
 
82
{
 
83
        struct work_queue_stats info;
 
84
        static int row_count = 0;
 
85
        int row_limit = 25;
 
86
 
 
87
        work_queue_get_stats(q, &info);
 
88
 
 
89
        if(row_count==0) {
 
90
                printf(" Total | Workers   | Tasks                      Avg | K-Cand K-Seqs | Total\n");
 
91
                printf("  Time | Idle Busy | Submit Idle  Run   Done   Time | Loaded Loaded | Speedup\n");
 
92
                row_count = row_limit;
 
93
        }
 
94
 
 
95
        printf("%6d | %4d %4d | %6d %4d %4d %6d %6.2lf | %6d %6d | %5.2lf\n",
 
96
                (int) (time(0) - start_time),
 
97
                info.workers_init + info.workers_ready,
 
98
                info.workers_busy,
 
99
                tasks_submitted,
 
100
                info.tasks_waiting,
 
101
                info.tasks_running,
 
102
                tasks_done,
 
103
                tasks_done ? tasks_runtime / (double) tasks_done / 1000000.0 : 0,
 
104
                candidates_loaded / 1000,
 
105
                sequences_loaded / 1000,
 
106
                (time(0)>start_time) ? (tasks_runtime/1000000.0) / (time(0)-start_time) : 0);
 
107
 
 
108
        row_count--;
 
109
 
 
110
        last_display_time = time(0);
 
111
        fflush(stdout);
 
112
}
 
113
 
 
114
/*
 
115
Check to see that the output consists of an envelope [ ... ]
 
116
around some OVL records.  If there are no good matches in the
 
117
output, we should see an envelope with nothing in it.
 
118
If the file is completely empty, then there is a problem and
 
119
we reject the output.
 
120
*/
 
121
 
 
122
static char * confirm_output( char *output )
 
123
{
 
124
        char *s = output;
 
125
        char *result = 0;
 
126
 
 
127
        while(unsigned_isspace(*s)) s++;
 
128
 
 
129
        if(*s!='[') {
 
130
                debug(D_NOTICE,"aligment output did not begin with [:\n%s\n",output);
 
131
                return 0;
 
132
        }
 
133
 
 
134
        s++;
 
135
 
 
136
        while(unsigned_isspace(*s)) s++;
 
137
 
 
138
        result = s;
 
139
 
 
140
        while(*s) s++;
 
141
 
 
142
        s--;
 
143
 
 
144
        while(unsigned_isspace(*s)) s--;
 
145
 
 
146
        if(*s!=']') {
 
147
                debug(D_NOTICE,"aligment output did not end with ]:\n%s\n",output);
 
148
                return 0;
 
149
        }
 
150
 
 
151
        *s = 0;
 
152
 
 
153
        return result;
 
154
}
 
155
 
 
156
static void task_complete( struct work_queue_task *t )
 
157
{
 
158
        if(t->return_status!=0) {
 
159
                debug(D_NOTICE,"task failed with status %d on host %s\n",t->return_status,t->host);
 
160
                work_queue_submit(queue,t);
 
161
                return;
 
162
        }
 
163
 
 
164
        char *clean_output = confirm_output(t->output);
 
165
        if(!clean_output) {
 
166
                work_queue_submit(queue,t);
 
167
                return;
 
168
        }
 
169
 
 
170
        fprintf(output_file,"%s",clean_output);
 
171
        fflush(output_file);
 
172
 
 
173
        tasks_done++;
 
174
        tasks_runtime += (t->finish_time - t->start_time);
 
175
        tasks_filetime += t->total_transfer_time;
 
176
 
 
177
        work_queue_task_delete(t);
 
178
}
 
179
 
 
180
static int candidate_read(FILE * fp, char *name1, char *name2, char *extra_data)
 
181
{
 
182
        char line[CAND_FILE_LINE_MAX];
 
183
 
 
184
        clearerr(fp);
 
185
 
 
186
        long start_of_line = ftell(fp);
 
187
 
 
188
        if(!fgets(line, CAND_FILE_LINE_MAX, fp)) return CANDIDATE_WAIT;
 
189
 
 
190
        if(line[strlen(line)-1]!='\n') {
 
191
                fseek(fp,start_of_line,SEEK_SET);
 
192
                return CANDIDATE_WAIT;
 
193
        }
 
194
 
 
195
        if(!strcmp(line,"EOF\n")) {
 
196
                more_candidates = 0;
 
197
                return CANDIDATE_EOF;
 
198
        }
 
199
 
 
200
        int n = sscanf(line, "%s %s %[^\n]", name1, name2, extra_data);
 
201
        if(n!=3) fatal("candidate file is corrupted: %s\n",line);
 
202
 
 
203
        candidates_loaded++;
 
204
 
 
205
        return CANDIDATE_SUCCESS;
 
206
}
 
207
 
 
208
struct cseq * sequence_lookup( struct hash_table *h, const char *name )
 
209
{
 
210
        struct cseq *c = hash_table_lookup(h,name);
 
211
        if(c) return c;
 
212
 
 
213
        while(1) {
 
214
                c = cseq_read(sequence_file);
 
215
                if(!c) break;
 
216
 
 
217
                sequences_loaded++;
 
218
 
 
219
                hash_table_insert(h,c->name,c);
 
220
                if(!strcmp(name,c->name)) return c;
 
221
 
 
222
                int size = hash_table_size(h);
 
223
                if(size%100000 ==0 )debug(D_DEBUG,"loaded %d sequences",size);
 
224
        }
 
225
 
 
226
        fatal("candidate file contains invalid sequence name: %s\n",name);
 
227
        return 0;
 
228
}
 
229
 
 
230
static void buffer_ensure( char **buffer, int *buffer_size, int buffer_used, int buffer_delta )
 
231
{
 
232
        int buffer_needed = buffer_used + buffer_delta;
 
233
 
 
234
        if(buffer_needed>*buffer_size) {
 
235
                do {
 
236
                        *buffer_size *=2 ;
 
237
                } while( buffer_needed > *buffer_size );
 
238
 
 
239
                *buffer = realloc(*buffer,*buffer_size);
 
240
        }
 
241
}
 
242
 
 
243
static struct work_queue_task * task_create( struct hash_table *sequence_table )
 
244
{
 
245
        char aname1[CAND_FILE_LINE_MAX];
 
246
        char aname2[CAND_FILE_LINE_MAX];
 
247
        char aextra[CAND_FILE_LINE_MAX];
 
248
 
 
249
        char bname1[CAND_FILE_LINE_MAX];
 
250
        char bname2[CAND_FILE_LINE_MAX];
 
251
        char bextra[CAND_FILE_LINE_MAX];
 
252
 
 
253
        struct cseq *s1, *s2;
 
254
 
 
255
        int result = candidate_read(candidate_file,aname1,aname2,aextra);
 
256
        if(result!=CANDIDATE_SUCCESS) return 0;
 
257
 
 
258
        s1 = sequence_lookup(sequence_table,aname1);
 
259
        s2 = sequence_lookup(sequence_table,aname2);
 
260
 
 
261
        static int buffer_size = 1024;
 
262
        char *buffer = malloc(buffer_size);
 
263
        int buffer_pos = 0;
 
264
 
 
265
        buffer_ensure(&buffer,&buffer_size,buffer_pos,cseq_size(s1)+cseq_size(s2)+10);
 
266
 
 
267
        buffer_pos += cseq_sprint(&buffer[buffer_pos],s1,"");
 
268
        buffer_pos += cseq_sprint(&buffer[buffer_pos],s2,aextra);
 
269
 
 
270
        int npairs = 1;
 
271
        int nseqs = 2;
 
272
 
 
273
        do {
 
274
                result = candidate_read(candidate_file,bname1,bname2,bextra);
 
275
                if(result!=CANDIDATE_SUCCESS) break;
 
276
 
 
277
                s1 = sequence_lookup(sequence_table,bname1);
 
278
                s2 = sequence_lookup(sequence_table,bname2);
 
279
 
 
280
                if(strcmp(aname1,bname1)!=0) {
 
281
                        buffer_ensure(&buffer,&buffer_size,buffer_pos,cseq_size(s1)+cseq_size(s2)+10);
 
282
                        buffer_pos += cseq_sprint(&buffer[buffer_pos],0,"");
 
283
                        buffer_pos += cseq_sprint(&buffer[buffer_pos],s1,"");
 
284
                        strcpy(aname1,bname1);
 
285
                        strcpy(aname2,bname2);
 
286
                        strcpy(aextra,bextra);
 
287
                        nseqs++;
 
288
                }
 
289
 
 
290
                buffer_ensure(&buffer,&buffer_size,buffer_pos,cseq_size(s2)+10);
 
291
                buffer_pos += cseq_sprint(&buffer[buffer_pos],s2,bextra);
 
292
 
 
293
                nseqs++;
 
294
                npairs++;
 
295
 
 
296
        } while( npairs < max_pairs_per_task );
 
297
 
 
298
        debug(D_DEBUG,"created task of %d sequences and %d comparisons\n",nseqs,npairs);
 
299
 
 
300
        char cmd[strlen(align_prog)+strlen(align_prog_args)+100];
 
301
 
 
302
        sprintf(cmd, "./%s %s aligndata", "align", align_prog_args);
 
303
 
 
304
        struct work_queue_task *t = work_queue_task_create(cmd);
 
305
        work_queue_task_specify_input_file(t, align_prog, "align");
 
306
        work_queue_task_specify_input_buf(t, buffer, buffer_pos, "aligndata");
 
307
 
 
308
        free(buffer);
 
309
 
 
310
        return t;
 
311
}
 
312
 
 
313
int main(int argc, char *argv[])
 
314
{
 
315
        char c;
 
316
 
 
317
        const char *progname = "sand_align_master";
 
318
 
 
319
        debug_config(progname);
 
320
 
 
321
        // By default, turn on fast abort option since we know each job is of very similar size (in terms of runtime).
 
322
        // One can also set the fast_abort_multiplier by the '-f' option.
 
323
        wq_option_fast_abort_multiplier = 10;
 
324
 
 
325
        while((c = getopt(argc, argv, "e:F:p:n:d:o:vh")) != (char) -1) {
 
326
                switch (c) {
 
327
                case 'p':
 
328
                        port = atoi(optarg);
 
329
                        break;
 
330
                case 'n':
 
331
                        max_pairs_per_task = atoi(optarg);
 
332
                        break;
 
333
                case 'e':
 
334
                        align_prog_args = strdup(optarg);
 
335
                        break;
 
336
                case 'd':
 
337
                        debug_flags_set(optarg);
 
338
                        break;
 
339
                case 'F':
 
340
                        wq_option_fast_abort_multiplier = atof(optarg);
 
341
                        break;
 
342
                case 'o':
 
343
                        debug_config_file(optarg);
 
344
                        break;
 
345
                case 'v':
 
346
                        show_version(progname);
 
347
                        exit(0);
 
348
                        break;
 
349
                case 'h':
 
350
                        show_help(progname);
 
351
                        exit(0);
 
352
                        break;
 
353
                }
 
354
        }
 
355
 
 
356
 
 
357
        if((argc - optind) != 4) {
 
358
                show_help(progname);
 
359
                exit(1);
 
360
        }
 
361
 
 
362
        if(!find_executable(argv[optind],"PATH",align_prog,sizeof(align_prog))) {
 
363
                fprintf(stderr, "%s: couldn't find alignment program %s: is it in your path?\n",progname,argv[optind]);
 
364
                return 1;
 
365
        }
 
366
                        
 
367
 
 
368
        candidate_file_name = argv[optind + 1];
 
369
        sequence_file_name = argv[optind + 2];
 
370
        output_file_name = argv[optind + 3];
 
371
 
 
372
        sequence_file = fopen(sequence_file_name,"r");
 
373
        if(!sequence_file) {
 
374
                fprintf(stderr, "%s: couldn't open sequence file %s: %s\n", progname, sequence_file_name, strerror(errno));
 
375
                return 1;
 
376
        }
 
377
 
 
378
        candidate_file = fopen(candidate_file_name,"r");
 
379
        if(!candidate_file) {
 
380
                fprintf(stderr, "%s: couldn't open candidate file %s: %s\n", progname,candidate_file_name, strerror(errno));
 
381
                return 1;
 
382
        }
 
383
 
 
384
        output_file = fopen(output_file_name, "a");
 
385
        if(!output_file) {
 
386
                fprintf(stderr, "%s: couldn't open output file %s: %s\n", progname,output_file_name, strerror(errno));
 
387
                return 1;
 
388
        }
 
389
 
 
390
        queue = work_queue_create(port);
 
391
        if(!queue) {
 
392
                fprintf(stderr, "%s: couldn't listen on port %d: %s\n",progname,port,strerror(errno));
 
393
                return 1;
 
394
        }
 
395
 
 
396
        sequence_table = hash_table_create(20000001,0);
 
397
 
 
398
        start_time = time(0);
 
399
 
 
400
        struct work_queue_task *t;
 
401
 
 
402
        while( more_candidates || !work_queue_empty(queue) ) {
 
403
 
 
404
                if(last_display_time < time(0))
 
405
                        display_progress(queue);
 
406
 
 
407
                while( more_candidates && work_queue_hungry(queue) ) {
 
408
                        t = task_create( sequence_table );
 
409
                        if(t) {
 
410
                                work_queue_submit(queue,t);
 
411
                                tasks_submitted++;
 
412
                        } else {
 
413
                                break;
 
414
                        }
 
415
                }
 
416
 
 
417
                if(work_queue_empty(queue)) {
 
418
                        if(more_candidates) sleep(5);
 
419
                } else {
 
420
                        if(work_queue_hungry(queue)) {
 
421
                                t = work_queue_wait(queue,0);
 
422
                        } else {
 
423
                                t = work_queue_wait(queue,5);
 
424
                        }
 
425
                        if(t) task_complete(t);
 
426
                }
 
427
        }
 
428
 
 
429
        display_progress(queue);
 
430
 
 
431
        printf("Completed %i tasks in %i seconds\n", tasks_done, (int) (time(0) - start_time));
 
432
 
 
433
        fclose(output_file);
 
434
        fclose(candidate_file);
 
435
 
 
436
        work_queue_delete(queue);
 
437
 
 
438
        return 0;
 
439
}