~ubuntu-branches/ubuntu/vivid/cctools/vivid

« back to all changes in this revision

Viewing changes to allpairs/src/allpairs_master.c

  • Committer: Bazaar Package Importer
  • Author(s): Michael Hanke
  • Date: 2011-05-07 09:05:00 UTC
  • Revision ID: james.westby@ubuntu.com-20110507090500-lqpmdtwndor6e7os
Tags: upstream-3.3.2
ImportĀ upstreamĀ versionĀ 3.3.2

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
Copyright (C) 2008- The University of Notre Dame
 
3
This software is distributed under the GNU General Public License.
 
4
See the file COPYING for details.
 
5
*/
 
6
 
 
7
#include <stdio.h>
 
8
#include <unistd.h>
 
9
#include <stdlib.h>
 
10
#include <string.h>
 
11
#include <errno.h>
 
12
 
 
13
#include "debug.h"
 
14
#include "work_queue.h"
 
15
#include "envtools.h"
 
16
#include "text_list.h"
 
17
#include "hash_table.h"
 
18
#include "stringtools.h"
 
19
#include "xmalloc.h"
 
20
#include "macros.h"
 
21
#include "envtools.h"
 
22
#include "fast_popen.h"
 
23
#include "list.h"
 
24
 
 
25
#include "allpairs_compare.h"
 
26
 
 
27
#define ALLPAIRS_LINE_MAX 4096
 
28
 
 
29
static const char *progname = "allpairs_master";
 
30
static char allpairs_multicore_program[ALLPAIRS_LINE_MAX] = "allpairs_multicore";
 
31
static char allpairs_compare_program[ALLPAIRS_LINE_MAX];
 
32
 
 
33
static double compare_program_time = 0.0;
 
34
static const char * extra_arguments = "";
 
35
static int use_external_program = 0;
 
36
static struct list *extra_files_list = 0;
 
37
 
 
38
static int xcurrent = 0;
 
39
static int ycurrent = 0;
 
40
static int xblock = 0;
 
41
static int yblock = 0;
 
42
static int xstop = 0;
 
43
static int ystop = 0;
 
44
 
 
45
static void show_version(const char *cmd)
 
46
{
 
47
        printf("%s version %d.%d.%d built by %s@%s on %s at %s\n", cmd, CCTOOLS_VERSION_MAJOR, CCTOOLS_VERSION_MINOR, CCTOOLS_VERSION_MICRO, BUILD_USER, BUILD_HOST, __DATE__, __TIME__);
 
48
}
 
49
 
 
50
static void show_help(const char *cmd)
 
51
{
 
52
        printf("Usage: %s [options] <set A> <set B> <compare function>\n", cmd);
 
53
        printf("The most common options are:\n");
 
54
        printf(" -p <port>      The port that the master will be listening on.\n");
 
55
        printf(" -e <args>      Extra arguments to pass to the comparison function.\n");
 
56
        printf(" -f <file>      Extra input file needed by the comparison function.  (may be given multiple times)\n");
 
57
        printf(" -t <seconds>   Estimated time to run one comparison.  (default chosen at runtime)\n");
 
58
        printf(" -x <items>     Width of one work unit, in items to compare.  (default chosen at runtime)\n");
 
59
        printf(" -y <items>     Height of one work unit, in items to compare.  (default chosen at runtime)\n");
 
60
        printf(" -N <project>   Report the master information to a catalog server with the project name - <project>\n");
 
61
        printf(" -E <priority>  Priority. Higher the value, higher the priority.\n");
 
62
        printf(" -d <flag>      Enable debugging for this subsystem.  (Try -d all to start.)\n");
 
63
        printf(" -v             Show program version.\n");
 
64
        printf(" -h             Display this message.\n");
 
65
}
 
66
 
 
67
/*
 
68
Run the comparison program repeatedly until five seconds have elapsed,
 
69
in order to get a rough measurement of the execution time.
 
70
No very accurate for embedded functions. 
 
71
*/
 
72
 
 
73
double estimate_run_time( struct text_list *seta, struct text_list *setb )
 
74
{
 
75
        char line[ALLPAIRS_LINE_MAX];
 
76
        int loops=0;
 
77
        time_t starttime, stoptime;
 
78
 
 
79
        fprintf(stderr, "%s: sampling execution time of %s...\n",progname,allpairs_compare_program);
 
80
 
 
81
        sprintf(line,"./%s %s %s %s",
 
82
                string_basename(allpairs_compare_program),
 
83
                extra_arguments,
 
84
                text_list_get(seta,0),
 
85
                text_list_get(setb,0)
 
86
                );
 
87
 
 
88
        starttime = time(0);
 
89
 
 
90
        do {
 
91
                FILE *file = fast_popen(line);
 
92
                if(!file) {
 
93
                        fprintf(stderr,"%s: couldn't execute %s: %s\n",progname,line,strerror(errno));
 
94
                        exit(1);
 
95
                }
 
96
 
 
97
                while(fgets(line,sizeof(line),file)) {
 
98
                        fprintf(stderr,"%s",line);
 
99
                }
 
100
 
 
101
                fast_pclose(file);
 
102
 
 
103
                stoptime = time(0);
 
104
                loops++;
 
105
        } while( (stoptime-starttime) < 5 );
 
106
 
 
107
        double t = (stoptime - starttime) / loops;
 
108
 
 
109
        if(t<0.01) t = 0.01;
 
110
 
 
111
        return  t;  
 
112
}
 
113
 
 
114
/*
 
115
After measuring the function run time, try to choose a
 
116
squarish work unit that will take just over one minute to complete.
 
117
*/
 
118
 
 
119
void estimate_block_size( struct text_list *seta, struct text_list *setb, int *xblock, int *yblock )
 
120
{
 
121
        if(compare_program_time==0) {
 
122
                if(use_external_program) {
 
123
                        compare_program_time = estimate_run_time(seta,setb);
 
124
                } else {
 
125
                        compare_program_time = 0.1;
 
126
                }
 
127
        }
 
128
 
 
129
        fprintf(stderr, "%s: %s estimated at %.02lfs per comparison\n",progname,allpairs_compare_program,compare_program_time);
 
130
 
 
131
        int block_limit = 60;
 
132
        double block_time;
 
133
 
 
134
        *xblock = *yblock = 1;
 
135
 
 
136
        while(1) {
 
137
                block_time = *xblock * *yblock * compare_program_time;
 
138
                if(block_time>block_limit) break;
 
139
 
 
140
                if(*xblock < text_list_size(seta)) (*xblock)++;
 
141
                if(*yblock < text_list_size(setb)) (*yblock)++;
 
142
 
 
143
                if(*xblock==text_list_size(seta) && *yblock==text_list_size(setb)) break;
 
144
        }
 
145
}
 
146
 
 
147
/*
 
148
Convert a text_list object into a single string that we can
 
149
pass as a buffer to a remote task via work queue.
 
150
*/
 
151
 
 
152
char * text_list_string( struct text_list *t, int a, int b )
 
153
{
 
154
        static int buffer_size = 128;
 
155
        char *buffer = malloc(buffer_size);
 
156
        int buffer_pos = 0;
 
157
        int i;
 
158
                
 
159
        for(i=a;i<b;i++) {
 
160
                const char *str = text_list_get(t,i);
 
161
                if(!str) break;
 
162
                str = string_basename(str);
 
163
                while((strlen(str) + buffer_pos + 3)>= buffer_size) {
 
164
                        buffer_size *= 2;
 
165
                        buffer = realloc(buffer,buffer_size);
 
166
                }
 
167
                buffer_pos += sprintf(&buffer[buffer_pos],"%s\n",str);
 
168
        }
 
169
 
 
170
        buffer[buffer_pos] = 0;
 
171
 
 
172
        return buffer;
 
173
}
 
174
 
 
175
/*
 
176
Create the next task in order to be submitted to the work queue.
 
177
Basically, bump the current position in the results matrix by
 
178
xblock, yblock, and then construct a task with a list of files
 
179
on each axis, and attach the necessary files.
 
180
*/
 
181
 
 
182
struct work_queue_task * task_create( struct text_list *seta, struct text_list *setb )
 
183
{
 
184
        int x,y;
 
185
        char *buf, *name;
 
186
 
 
187
        if(xcurrent>=xstop) {
 
188
                xcurrent=0;
 
189
                ycurrent+=yblock;
 
190
        }
 
191
 
 
192
        if(ycurrent>=ystop) return 0;
 
193
 
 
194
        char cmd[ALLPAIRS_LINE_MAX];
 
195
        sprintf(cmd,"./%s -e \"%s\" A B %s%s",string_basename(allpairs_multicore_program),extra_arguments,use_external_program ? "./" : "",string_basename(allpairs_compare_program));
 
196
        struct work_queue_task *task = work_queue_task_create(cmd);
 
197
 
 
198
        if(use_external_program) {
 
199
                work_queue_task_specify_file(task,allpairs_compare_program,string_basename(allpairs_compare_program),WORK_QUEUE_INPUT,WORK_QUEUE_CACHE);
 
200
        }
 
201
 
 
202
        work_queue_task_specify_file(task,allpairs_multicore_program,string_basename(allpairs_multicore_program),WORK_QUEUE_INPUT,WORK_QUEUE_CACHE);
 
203
 
 
204
        const char *f;
 
205
        list_first_item(extra_files_list);
 
206
        while((f = list_next_item(extra_files_list))) {
 
207
                work_queue_task_specify_file(task,f,string_basename(f),WORK_QUEUE_INPUT,WORK_QUEUE_CACHE);
 
208
        }
 
209
 
 
210
        buf = text_list_string(seta,xcurrent,xcurrent+xblock);
 
211
        work_queue_task_specify_buffer(task,buf,strlen(buf),"A",WORK_QUEUE_NOCACHE);
 
212
        free(buf);
 
213
 
 
214
        buf = text_list_string(setb,ycurrent,ycurrent+yblock);
 
215
        work_queue_task_specify_buffer(task,buf,strlen(buf),"B",WORK_QUEUE_NOCACHE);
 
216
        free(buf);
 
217
        
 
218
        for(x=xcurrent;x<(xcurrent+xblock);x++) {
 
219
                name = text_list_get(seta,x);
 
220
                if(!name) break;
 
221
                work_queue_task_specify_file(task,name,string_basename(name),WORK_QUEUE_INPUT,WORK_QUEUE_CACHE);
 
222
        }
 
223
 
 
224
        for(y=ycurrent;y<(ycurrent+yblock);y++) {
 
225
                name = text_list_get(setb,y);
 
226
                if(!name) break;
 
227
                work_queue_task_specify_file(task,name,string_basename(name),WORK_QUEUE_INPUT,WORK_QUEUE_CACHE);
 
228
        }
 
229
 
 
230
        /* advance to the next row/column */
 
231
        xcurrent += xblock;
 
232
 
 
233
        return task;
 
234
}
 
235
 
 
236
void task_complete( struct work_queue_task *t )
 
237
{
 
238
        string_chomp(t->output);
 
239
        printf("%s\n",t->output);
 
240
        work_queue_task_delete(t);
 
241
}
 
242
 
 
243
int main(int argc, char **argv)
 
244
{
 
245
        char c;
 
246
        struct work_queue *q;
 
247
        int port = WORK_QUEUE_DEFAULT_PORT;
 
248
 
 
249
        extra_files_list = list_create();
 
250
 
 
251
        while((c = getopt(argc, argv, "e:f:t:x:y:p:N:E:d:vh")) != (char) -1) {
 
252
                switch (c) {
 
253
                case 'e':
 
254
                        extra_arguments = optarg;
 
255
                        break;
 
256
                case 'f':
 
257
                        list_push_head(extra_files_list,optarg);
 
258
                        break;
 
259
                case 't':
 
260
                        compare_program_time = atof(optarg);
 
261
                        break;
 
262
                case 'x':
 
263
                        xblock = atoi(optarg);
 
264
                        break;
 
265
                case 'y':
 
266
                        yblock = atoi(optarg);
 
267
                        break;
 
268
                case 'p':
 
269
                        port = atoi(optarg);
 
270
                        break;
 
271
                case 'N':
 
272
                        setenv("WORK_QUEUE_NAME", optarg, 1);
 
273
                        break;
 
274
                case 'E':
 
275
                        setenv("WORK_QUEUE_PRIORITY", optarg, 1);
 
276
                        break;
 
277
                case 'd':
 
278
                        debug_flags_set(optarg);
 
279
                        break;
 
280
                case 'v':
 
281
                        show_version(progname);
 
282
                        exit(0);
 
283
                        break;
 
284
                case 'h':
 
285
                        show_help(progname);
 
286
                        exit(0);
 
287
                        break;
 
288
                default:
 
289
                        show_help(progname);
 
290
                        return 1;
 
291
                }
 
292
        }
 
293
 
 
294
        if((argc - optind) < 3) {
 
295
                show_help(progname);
 
296
                exit(1);
 
297
        }
 
298
 
 
299
        struct text_list *seta = text_list_load(argv[optind]);
 
300
        if(!seta) {
 
301
                fprintf(stderr,"%s: couldn't open %s: %s\n",progname,argv[optind+1],strerror(errno));
 
302
                return 1;
 
303
        }
 
304
 
 
305
        fprintf(stderr, "%s: %s has %d elements\n",progname,argv[optind],text_list_size(seta));
 
306
 
 
307
        struct text_list *setb = text_list_load(argv[optind+1]);
 
308
        if(!setb) {
 
309
                fprintf(stderr,"%s: couldn't open %s: %s\n",progname,argv[optind+1],strerror(errno));
 
310
                return 1;
 
311
        }
 
312
 
 
313
        fprintf(stderr, "%s: %s has %d elements\n",progname,argv[optind+1],text_list_size(setb));
 
314
 
 
315
        if (!find_executable("allpairs_multicore","PATH",allpairs_multicore_program,sizeof(allpairs_multicore_program))) {
 
316
                fprintf(stderr,"%s: couldn't find allpairs_multicore in path\n",progname);
 
317
                return 1;
 
318
        }
 
319
 
 
320
        debug(D_DEBUG,"using multicore executable %s",allpairs_multicore_program);
 
321
        
 
322
        if(allpairs_compare_function_get(argv[optind+2])) {
 
323
                strcpy(allpairs_compare_program,argv[optind+2]);
 
324
                debug(D_DEBUG,"using internal function %s",allpairs_compare_program);
 
325
                use_external_program = 0;
 
326
        } else {
 
327
                if(!find_executable(argv[optind+2],"PATH",allpairs_compare_program,sizeof(allpairs_compare_program))) {
 
328
                        fprintf(stderr,"%s: %s is neither an executable nor an internal comparison function.\n",progname,allpairs_compare_program);
 
329
                        return 1;
 
330
                }
 
331
                debug(D_DEBUG,"using comparison executable %s",allpairs_compare_program);
 
332
                use_external_program = 1;
 
333
        }
 
334
 
 
335
        if(!xblock || !yblock) {
 
336
                estimate_block_size(seta,setb,&xblock,&yblock);
 
337
        }
 
338
 
 
339
        fprintf(stderr, "%s: using block size of %dx%d\n",progname,xblock,yblock);
 
340
 
 
341
        q = work_queue_create(port);
 
342
        if(!q) {
 
343
                fprintf(stderr,"%s: could not create work queue on port %d: %s\n",progname,port,strerror(errno));
 
344
                return 1;
 
345
        }
 
346
 
 
347
        fprintf(stderr, "%s: listening for workers on port %d...\n",progname,work_queue_port(q));
 
348
 
 
349
        if(!xstop) xstop = text_list_size(seta);
 
350
        if(!ystop) ystop = text_list_size(setb);
 
351
 
 
352
        while(1) {
 
353
                struct work_queue_task *task = NULL;
 
354
                while(work_queue_hungry(q)) {
 
355
                        task = task_create(seta,setb);
 
356
                        if(task) {
 
357
                                work_queue_submit(q, task);
 
358
                        } else {
 
359
                                break;
 
360
                        }
 
361
                }
 
362
 
 
363
                if(!task && work_queue_empty(q)) break;
 
364
 
 
365
                task = work_queue_wait(q,5);
 
366
                if(task) task_complete(task);
 
367
        }
 
368
 
 
369
        work_queue_delete(q);
 
370
 
 
371
        return 0;
 
372
}