2
Copyright (C) 2008- The University of Notre Dame
3
This software is distributed under the GNU General Public License.
4
See the file COPYING for details.
14
#include "work_queue.h"
16
#include "text_list.h"
17
#include "hash_table.h"
18
#include "stringtools.h"
22
#include "fast_popen.h"
25
#include "allpairs_compare.h"
27
#define ALLPAIRS_LINE_MAX 4096
29
static const char *progname = "allpairs_master";
30
static char allpairs_multicore_program[ALLPAIRS_LINE_MAX] = "allpairs_multicore";
31
static char allpairs_compare_program[ALLPAIRS_LINE_MAX];
33
static double compare_program_time = 0.0;
34
static const char * extra_arguments = "";
35
static int use_external_program = 0;
36
static struct list *extra_files_list = 0;
38
static int xcurrent = 0;
39
static int ycurrent = 0;
40
static int xblock = 0;
41
static int yblock = 0;
45
static void show_version(const char *cmd)
47
printf("%s version %d.%d.%d built by %s@%s on %s at %s\n", cmd, CCTOOLS_VERSION_MAJOR, CCTOOLS_VERSION_MINOR, CCTOOLS_VERSION_MICRO, BUILD_USER, BUILD_HOST, __DATE__, __TIME__);
50
static void show_help(const char *cmd)
52
printf("Usage: %s [options] <set A> <set B> <compare function>\n", cmd);
53
printf("The most common options are:\n");
54
printf(" -p <port> The port that the master will be listening on.\n");
55
printf(" -e <args> Extra arguments to pass to the comparison function.\n");
56
printf(" -f <file> Extra input file needed by the comparison function. (may be given multiple times)\n");
57
printf(" -t <seconds> Estimated time to run one comparison. (default chosen at runtime)\n");
58
printf(" -x <items> Width of one work unit, in items to compare. (default chosen at runtime)\n");
59
printf(" -y <items> Height of one work unit, in items to compare. (default chosen at runtime)\n");
60
printf(" -N <project> Report the master information to a catalog server with the project name - <project>\n");
61
printf(" -E <priority> Priority. Higher the value, higher the priority.\n");
62
printf(" -d <flag> Enable debugging for this subsystem. (Try -d all to start.)\n");
63
printf(" -v Show program version.\n");
64
printf(" -h Display this message.\n");
68
Run the comparison program repeatedly until five seconds have elapsed,
69
in order to get a rough measurement of the execution time.
70
No very accurate for embedded functions.
73
double estimate_run_time( struct text_list *seta, struct text_list *setb )
75
char line[ALLPAIRS_LINE_MAX];
77
time_t starttime, stoptime;
79
fprintf(stderr, "%s: sampling execution time of %s...\n",progname,allpairs_compare_program);
81
sprintf(line,"./%s %s %s %s",
82
string_basename(allpairs_compare_program),
84
text_list_get(seta,0),
91
FILE *file = fast_popen(line);
93
fprintf(stderr,"%s: couldn't execute %s: %s\n",progname,line,strerror(errno));
97
while(fgets(line,sizeof(line),file)) {
98
fprintf(stderr,"%s",line);
105
} while( (stoptime-starttime) < 5 );
107
double t = (stoptime - starttime) / loops;
115
After measuring the function run time, try to choose a
116
squarish work unit that will take just over one minute to complete.
119
void estimate_block_size( struct text_list *seta, struct text_list *setb, int *xblock, int *yblock )
121
if(compare_program_time==0) {
122
if(use_external_program) {
123
compare_program_time = estimate_run_time(seta,setb);
125
compare_program_time = 0.1;
129
fprintf(stderr, "%s: %s estimated at %.02lfs per comparison\n",progname,allpairs_compare_program,compare_program_time);
131
int block_limit = 60;
134
*xblock = *yblock = 1;
137
block_time = *xblock * *yblock * compare_program_time;
138
if(block_time>block_limit) break;
140
if(*xblock < text_list_size(seta)) (*xblock)++;
141
if(*yblock < text_list_size(setb)) (*yblock)++;
143
if(*xblock==text_list_size(seta) && *yblock==text_list_size(setb)) break;
148
Convert a text_list object into a single string that we can
149
pass as a buffer to a remote task via work queue.
152
char * text_list_string( struct text_list *t, int a, int b )
154
static int buffer_size = 128;
155
char *buffer = malloc(buffer_size);
160
const char *str = text_list_get(t,i);
162
str = string_basename(str);
163
while((strlen(str) + buffer_pos + 3)>= buffer_size) {
165
buffer = realloc(buffer,buffer_size);
167
buffer_pos += sprintf(&buffer[buffer_pos],"%s\n",str);
170
buffer[buffer_pos] = 0;
176
Create the next task in order to be submitted to the work queue.
177
Basically, bump the current position in the results matrix by
178
xblock, yblock, and then construct a task with a list of files
179
on each axis, and attach the necessary files.
182
struct work_queue_task * task_create( struct text_list *seta, struct text_list *setb )
187
if(xcurrent>=xstop) {
192
if(ycurrent>=ystop) return 0;
194
char cmd[ALLPAIRS_LINE_MAX];
195
sprintf(cmd,"./%s -e \"%s\" A B %s%s",string_basename(allpairs_multicore_program),extra_arguments,use_external_program ? "./" : "",string_basename(allpairs_compare_program));
196
struct work_queue_task *task = work_queue_task_create(cmd);
198
if(use_external_program) {
199
work_queue_task_specify_file(task,allpairs_compare_program,string_basename(allpairs_compare_program),WORK_QUEUE_INPUT,WORK_QUEUE_CACHE);
202
work_queue_task_specify_file(task,allpairs_multicore_program,string_basename(allpairs_multicore_program),WORK_QUEUE_INPUT,WORK_QUEUE_CACHE);
205
list_first_item(extra_files_list);
206
while((f = list_next_item(extra_files_list))) {
207
work_queue_task_specify_file(task,f,string_basename(f),WORK_QUEUE_INPUT,WORK_QUEUE_CACHE);
210
buf = text_list_string(seta,xcurrent,xcurrent+xblock);
211
work_queue_task_specify_buffer(task,buf,strlen(buf),"A",WORK_QUEUE_NOCACHE);
214
buf = text_list_string(setb,ycurrent,ycurrent+yblock);
215
work_queue_task_specify_buffer(task,buf,strlen(buf),"B",WORK_QUEUE_NOCACHE);
218
for(x=xcurrent;x<(xcurrent+xblock);x++) {
219
name = text_list_get(seta,x);
221
work_queue_task_specify_file(task,name,string_basename(name),WORK_QUEUE_INPUT,WORK_QUEUE_CACHE);
224
for(y=ycurrent;y<(ycurrent+yblock);y++) {
225
name = text_list_get(setb,y);
227
work_queue_task_specify_file(task,name,string_basename(name),WORK_QUEUE_INPUT,WORK_QUEUE_CACHE);
230
/* advance to the next row/column */
236
void task_complete( struct work_queue_task *t )
238
string_chomp(t->output);
239
printf("%s\n",t->output);
240
work_queue_task_delete(t);
243
int main(int argc, char **argv)
246
struct work_queue *q;
247
int port = WORK_QUEUE_DEFAULT_PORT;
249
extra_files_list = list_create();
251
while((c = getopt(argc, argv, "e:f:t:x:y:p:N:E:d:vh")) != (char) -1) {
254
extra_arguments = optarg;
257
list_push_head(extra_files_list,optarg);
260
compare_program_time = atof(optarg);
263
xblock = atoi(optarg);
266
yblock = atoi(optarg);
272
setenv("WORK_QUEUE_NAME", optarg, 1);
275
setenv("WORK_QUEUE_PRIORITY", optarg, 1);
278
debug_flags_set(optarg);
281
show_version(progname);
294
if((argc - optind) < 3) {
299
struct text_list *seta = text_list_load(argv[optind]);
301
fprintf(stderr,"%s: couldn't open %s: %s\n",progname,argv[optind+1],strerror(errno));
305
fprintf(stderr, "%s: %s has %d elements\n",progname,argv[optind],text_list_size(seta));
307
struct text_list *setb = text_list_load(argv[optind+1]);
309
fprintf(stderr,"%s: couldn't open %s: %s\n",progname,argv[optind+1],strerror(errno));
313
fprintf(stderr, "%s: %s has %d elements\n",progname,argv[optind+1],text_list_size(setb));
315
if (!find_executable("allpairs_multicore","PATH",allpairs_multicore_program,sizeof(allpairs_multicore_program))) {
316
fprintf(stderr,"%s: couldn't find allpairs_multicore in path\n",progname);
320
debug(D_DEBUG,"using multicore executable %s",allpairs_multicore_program);
322
if(allpairs_compare_function_get(argv[optind+2])) {
323
strcpy(allpairs_compare_program,argv[optind+2]);
324
debug(D_DEBUG,"using internal function %s",allpairs_compare_program);
325
use_external_program = 0;
327
if(!find_executable(argv[optind+2],"PATH",allpairs_compare_program,sizeof(allpairs_compare_program))) {
328
fprintf(stderr,"%s: %s is neither an executable nor an internal comparison function.\n",progname,allpairs_compare_program);
331
debug(D_DEBUG,"using comparison executable %s",allpairs_compare_program);
332
use_external_program = 1;
335
if(!xblock || !yblock) {
336
estimate_block_size(seta,setb,&xblock,&yblock);
339
fprintf(stderr, "%s: using block size of %dx%d\n",progname,xblock,yblock);
341
q = work_queue_create(port);
343
fprintf(stderr,"%s: could not create work queue on port %d: %s\n",progname,port,strerror(errno));
347
fprintf(stderr, "%s: listening for workers on port %d...\n",progname,work_queue_port(q));
349
if(!xstop) xstop = text_list_size(seta);
350
if(!ystop) ystop = text_list_size(setb);
353
struct work_queue_task *task = NULL;
354
while(work_queue_hungry(q)) {
355
task = task_create(seta,setb);
357
work_queue_submit(q, task);
363
if(!task && work_queue_empty(q)) break;
365
task = work_queue_wait(q,5);
366
if(task) task_complete(task);
369
work_queue_delete(q);