1
/*************************************************************************************************
2
* The command line interface of web crawler
3
* Copyright (C) 2004-2006 Mikio Hirabayashi
4
* This file is part of Hyper Estraier.
5
* Hyper Estraier is free software; you can redistribute it and/or modify it under the terms of
6
* the GNU Lesser General Public License as published by the Free Software Foundation; either
7
* version 2.1 of the License or any later version. Hyper Estraier is distributed in the hope
8
* that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
9
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
10
* License for more details.
11
* You should have received a copy of the GNU Lesser General Public License along with Hyper
12
* Estraier; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330,
13
* Boston, MA 02111-1307 USA.
14
*************************************************************************************************/
19
#define SLEEPUSEC 100000 /* sleep time in micro seconds */
20
#define MINPRIOR 0.01 /* minimal priority of non-seed documents */
21
#define NODERTTNUM 5000 /* number of documents for node rotation */
23
enum { /* enumeration for crawling modes */
24
CM_CONTINUE, /* continue */
25
CM_RESTART, /* restart */
26
CM_REVISIT, /* revisit */
27
CM_REVCONT /* revisit and continue */
30
typedef struct { /* type of structure for interaction of a URL */
31
int thid; /* thread ID number */
32
WAVER *waver; /* waver handle */
34
int depth; /* depth */
35
int pid; /* ID number of the parent document */
36
double psim; /* similarity of the parent document */
37
time_t mdate; /* last-modified date */
41
/* global variables */
42
pthread_mutex_t g_mutex = PTHREAD_MUTEX_INITIALIZER; /* global mutex */
43
int g_thnum = 0; /* number of running threads */
44
int g_thseq = 0; /* sequence of thread ID */
45
CBMAP *g_tasks = NULL; /* URLs of running tasks */
46
const char *g_progname; /* program name */
47
int g_sigterm = FALSE; /* flag for termination signal */
48
const char *g_pxhost = NULL; /* host name of the proxy */
49
int g_pxport = 0; /* host name of the proxy */
50
int g_timeout = -1; /* timeout in seconds */
51
int g_inputlang = ESTLANGEN; /* prefered language */
54
/* function prototypes */
55
int main(int argc, char **argv);
56
static void usage(void);
57
static void setsignals(void);
58
static void sigtermhandler(int num);
59
static char *dequeue(WAVER *waver, int *depthp, int *pidp, double *psimp);
60
static void enqueuelinks(WAVER *waver, const char *base, CBLIST *links, CBMAP *kwords,
61
int depth, int id, int pid, double psim);
62
static int runinit(int argc, char **argv);
63
static int runcrawl(int argc, char **argv);
64
static int rununittest(int argc, char **argv);
65
static int runfetch(int argc, char **argv);
66
static int procinit(const char *rootdir, int opts);
67
static int proccrawl(const char *rootdir, int mode);
68
static int procunittest(const char *rootdir);
69
static int procfetch(const char *url);
70
static int strtolang(const char *str);
71
static char *capitalize(const char *str);
72
static void seedurldocs(WAVER *waver, const char *url, int depth, double bias, CBMAP *ulinks);
73
static int accessthnum(int inc);
74
static int puttask(const char *url);
75
static void outtask(const char *url);
76
static void *geturldoc(void *args);
77
static char *urltosavepath(const char *savedir, const char *url);
81
int main(int argc, char **argv){
84
if((tmp = getenv("ESTDBGFD")) != NULL) dpdbgfd = atoi(tmp);
88
g_tasks = cbmapopenex(MINIBNUM);
89
cbglobalgc(g_tasks, (void (*)(void *))cbmapclose);
90
if(!est_init_net_env()){
91
log_print(LL_ERROR, "could not initialize network environment");
94
atexit(est_free_net_env);
97
if(!strcmp(argv[1], "init")){
98
rv = runinit(argc, argv);
99
} else if(!strcmp(argv[1], "crawl")){
101
rv = runcrawl(argc, argv);
102
} else if(!strcmp(argv[1], "unittest")){
103
rv = rununittest(argc, argv);
104
} else if(!strcmp(argv[1], "fetch")){
105
rv = runfetch(argc, argv);
113
/* print the usage and exit */
114
static void usage(void){
115
fprintf(stderr, "%s: command line interface of web crawler\n", g_progname);
116
fprintf(stderr, "\n");
117
fprintf(stderr, "usage:\n");
118
fprintf(stderr, " %s init [-xs|-xl|-xh] rootdir\n", g_progname);
119
fprintf(stderr, " %s crawl [-restart|-revisit|-revcont] rootdir\n", g_progname);
120
fprintf(stderr, " %s unittest rootdir\n", g_progname);
121
fprintf(stderr, " %s fetch [-proxy host port] [-tout num] [-il lang] url\n", g_progname);
122
fprintf(stderr, "\n");
127
/* set signal handlers */
128
static void setsignals(void){
129
est_signal(1, sigtermhandler);
130
est_signal(2, sigtermhandler);
131
est_signal(3, sigtermhandler);
132
est_signal(13, SIG_IGN);
133
est_signal(14, SIG_IGN);
134
est_signal(15, sigtermhandler);
139
/* handler of termination signal */
140
static void sigtermhandler(int num){
142
fprintf(stderr, "%s: the termination signal %d catched\n", g_progname, num);
146
/* dequeue a URL from the priority queue (it has a critical section inside) */
147
static char *dequeue(WAVER *waver, int *depthp, int *pidp, double *psimp){
149
if(time(NULL) > waver->stime + waver->period) return NULL;
150
if(pthread_mutex_lock(&g_mutex) != 0){
151
log_print(LL_ERROR, "could not get mutex");
154
if(waver->curnum > waver->docnum){
155
pthread_mutex_unlock(&g_mutex);
158
while(queue_rnum(waver->queue) < 1 && g_thnum > 0){
159
pthread_mutex_unlock(&g_mutex);
160
est_usleep(SLEEPUSEC);
161
if(pthread_mutex_lock(&g_mutex) != 0){
162
log_print(LL_ERROR, "could not get mutex");
171
if(!(buf = queue_dequeue(waver->queue))) break;
172
if((rp = strchr(buf, '\t')) != NULL && (*depthp = atoi(buf)) >= 0){
174
rv = cbmemdup(rp, -1);
175
if((rp = strchr(buf, ':')) != NULL){
178
if((rp = strchr(rp, ':')) != NULL){
180
*psimp = strtod(rp, NULL);
188
pthread_mutex_unlock(&g_mutex);
193
/* add traced URLs (it should be called in a critical section) */
194
static void enqueuelinks(WAVER *waver, const char *base, CBLIST *links, CBMAP *kwords,
195
int depth, int id, int pid, double psim){
197
CBMAP *pkwords, *ulinks;
199
char numbuf[NUMBUFSIZ], *ubuf, *pv, *benc, *tenc;
200
int i, j, vsiz, lnum, llen, slash, vnum, *svec, *tvec, num, allow;
201
double similarity, lnumtune, depthtune, masstune, priority, remoteness;
202
if(depth >= waver->maxdepth) return;
203
if(!kwords || cbmaprnum(waver->kwords) < 1){
204
similarity = psim * 0.7;
207
vnum = waver->seedkeynum;
208
svec = cbmalloc(vnum * sizeof(int));
209
tvec = cbmalloc(vnum * sizeof(int));
210
est_vector_set_seed(waver->kwords, svec, vnum);
211
est_vector_set_target(waver->kwords, kwords, tvec, vnum);
212
similarity = est_vector_cosine(svec, tvec, vnum) * 0.9 + 0.1;
213
similarity = similarity * (1.0 - waver->inherit) + psim * waver->inherit;
217
if(pid > 0 && (pkwords = est_mtdb_get_keywords(waver->index, pid))){
218
vnum = waver->savekeynum;
219
svec = cbmalloc(vnum * sizeof(int));
220
tvec = cbmalloc(vnum * sizeof(int));
221
est_vector_set_seed(pkwords, svec, vnum);
222
est_vector_set_target(pkwords, kwords, tvec, vnum);
223
similarity *= 1.0 - pow(est_vector_cosine(svec, tvec, vnum), 3.14) * 0.8;
229
if((pv = strstr(base, "://")) != NULL && (pv = strchr(pv + 3, '/')) != NULL){
230
vbuf = cbmapget(waver->sites, base, pv - base + 1, NULL);
231
num = (vbuf ? atoi(vbuf) : 0) + 1;
232
sprintf(numbuf, "%d", num);
233
cbmapput(waver->sites, base, pv - base + 1, numbuf, -1, TRUE);
234
if(cbmaprnum(waver->sites) > (waver->queuesize / 3.0 + 1) * 1.4){
235
log_print(LL_INFO, "site map sliming: %d", cbmaprnum(waver->sites));
236
kwords_reduce(waver->sites, waver->queuesize / 3.0 + 1, TRUE);
239
ulinks = cbmapopenex(MINIBNUM);
240
cbmapput(ulinks, base, -1, "", 0, FALSE);
241
lnum = cblistnum(links) + 4;
242
lnumtune = pow(lnum, 0.7);
243
depthtune = pow(depth + 7, 0.8);
244
for(i = 0; i < cblistnum(links) && i < 1024; i++){
245
vbuf = cblistval(links, i, &vsiz);
246
ubuf = cbmemdup(vbuf, vsiz);
247
if((pv = strchr(ubuf, '#')) != NULL) *pv = '\0';
249
if(llen > 1024 || cbmapget(ulinks, ubuf, llen, NULL)){
253
cbmapput(ulinks, ubuf, -1, "", 0, FALSE);
255
for(j = 0; j < cblistnum(waver->pmrules); j++){
256
pmrule = (PMRULE *)cblistval(waver->pmrules, j, NULL);
257
switch(pmrule->visit){
259
if(est_regex_match(pmrule->regex, ubuf)) allow = TRUE;
262
if(est_regex_match(pmrule->regex, ubuf)) allow = FALSE;
271
if((pv = strstr(ubuf, "://")) != NULL && (pv = strchr(pv + 3, '/')) != NULL){
272
vbuf = cbmapget(waver->sites, ubuf, pv - ubuf + 1, NULL);
273
num = (vbuf ? atoi(vbuf) : 0) + 1;
274
if(num > waver->masscheck) masstune /= sqrt((double)num / waver->masscheck);
277
for(pv = ubuf; *pv != '\0'; pv++){
279
case '/': slash += 1; break;
280
case '?': slash += 5; break;
281
case '&': slash += 1; break;
282
case ';': slash += 1; break;
286
benc = cbmemdup(base, -1);
287
tenc = cbmemdup(ubuf, -1);
288
if((pv = strchr(benc, '?')) != NULL) pv[0] = '\0';
289
if((pv = strchr(tenc, '?')) != NULL) pv[0] = '\0';
290
if(!strcmp(tenc, benc)){
294
if((pv = strrchr(benc, '/')) != NULL) pv[1] = '\0';
295
if((pv = strrchr(tenc, '/')) != NULL) pv[1] = '\0';
296
if(cbstrfwmatch(tenc, benc)){
303
switch(waver->strategy){
305
priority = (similarity * 128 * masstune) / depthtune / lnumtune / slash;
306
priority *= (lnum - (i / remoteness)) / lnum;
307
if(llen > 80) priority /= pow(llen / 80.0, 0.7);
310
priority = similarity;
311
priority *= 0.9 + ((lnum - i) / (double)lnum) / 50;
314
priority = (depth + 1) * 0.001;
315
priority = priority > 1.0 ? 1.0 : priority;
316
priority *= 0.9 + ((lnum - i) / (double)lnum) / 50;
319
priority = 1.0 / (depth + 1);
320
priority *= 0.9 + ((lnum - i) / (double)lnum) / 50;
323
priority = (est_random() * 128 * masstune) / depthtune / lnumtune / slash;
324
priority *= (lnum - (i / remoteness)) / lnum;
325
if(llen > 80) priority /= pow(llen / 80.0, 0.7);
328
priority = (128 * masstune) / depthtune / lnumtune / slash;
329
priority *= (lnum - (i / remoteness)) / lnum;
330
if(llen > 80) priority /= pow(llen / 80.0, 0.7);
333
tenc = cbsprintf("%d:%d:%.5f\t%s", depth + 1, id, psim, ubuf);
334
queue_enqueue(waver->queue, tenc, 1.0 - priority);
339
if(queue_rnum(waver->queue) > waver->queuesize * 1.4){
340
log_print(LL_INFO, "queue sliming: %d", queue_rnum(waver->queue));
341
if(!queue_slim(waver->queue, waver->queuesize))
342
log_print(LL_ERROR, "queue sliming failed");
347
/* parse arguments of the init command */
348
static int runinit(int argc, char **argv){
353
for(i = 2; i < argc; i++){
354
if(!rootdir && argv[i][0] == '-'){
355
if(!strcmp(argv[i], "-xs")){
357
} else if(!strcmp(argv[i], "-xl")){
359
} else if(!strcmp(argv[i], "-xh")){
370
if(!rootdir) usage();
371
rv = procinit(rootdir, opts);
376
/* parse arguments of the crawl command */
377
static int runcrawl(int argc, char **argv){
382
for(i = 2; i < argc; i++){
383
if(!rootdir && argv[i][0] == '-'){
384
if(!strcmp(argv[i], "-restart")){
386
} else if(!strcmp(argv[i], "-revisit")){
388
} else if(!strcmp(argv[i], "-revcont")){
399
if(!rootdir) usage();
400
rv = proccrawl(rootdir, mode);
405
/* parse arguments of the unittest command */
406
static int rununittest(int argc, char **argv){
410
for(i = 2; i < argc; i++){
411
if(!rootdir && argv[i][0] == '-'){
419
if(!rootdir) usage();
420
rv = procunittest(rootdir);
425
/* parse arguments of the fetch command */
426
static int runfetch(int argc, char **argv){
430
for(i = 2; i < argc; i++){
431
if(!url && argv[i][0] == '-'){
432
if(!strcmp(argv[i], "-proxy")){
433
if(++i >= argc) usage();
435
if(++i >= argc) usage();
436
g_pxport = atoi(argv[i]);
437
} else if(!strcmp(argv[i], "-tout")){
438
if(++i >= argc) usage();
439
g_timeout = atoi(argv[i]);
440
} else if(!strcmp(argv[i], "-il")){
441
if(++i >= argc) usage();
442
g_inputlang = strtolang(argv[i]);
458
/* perform the init command */
459
static int procinit(const char *rootdir, int opts){
460
if(!waver_init(rootdir, opts)){
461
log_print(LL_ERROR, "initializing the root directory failed");
464
log_open(rootdir, LOGFILE, LL_INFO, FALSE);
465
log_print(LL_INFO, "the root directory created");
470
/* perform the init command */
471
static int proccrawl(const char *rootdir, int mode){
477
const char *kbuf, *rp;
478
char *url, *rec, *tmp, *endurl;
479
int i, err, depth, pid, thid, locked, ended;
482
if(!(waver = waver_open(rootdir))){
483
log_print(LL_ERROR, "%s: could not open", rootdir);
489
log_print(LL_INFO, "crawling started (continue)");
492
log_print(LL_INFO, "crawling started (restart)");
495
log_print(LL_INFO, "crawling started (revisit)");
498
log_print(LL_INFO, "crawling started (revcont)");
501
if(mode == CM_RESTART){
502
while((tmp = queue_dequeue(waver->queue)) != NULL){
507
if(mode == CM_REVISIT || mode == CM_REVCONT){
509
criterinit(waver->trace);
510
while((url = criternext(waver->trace, NULL)) != NULL){
511
if((rec = crget(waver->trace, url, -1, 0, -1, NULL)) != NULL){
512
if(est_mtdb_uri_to_id(waver->index, url) > 0){
513
mdate = (time_t)strtod(rec, NULL);
517
if((rp = strchr(rec, ':')) != NULL){
520
if((rp = strchr(rp, ':')) != NULL){
523
if((rp = strchr(rp, ':')) != NULL){
525
psim = strtod(rp, NULL);
529
tmp = cbsprintf("%d:%d:%.5f\t%s", depth, pid, psim, url);
530
queue_enqueue(waver->queue, tmp, (mdate / t) * MINPRIOR);
539
if(mode == CM_RESTART){
540
criterinit(waver->trace);
541
while((url = criternext(waver->trace, NULL)) != NULL){
542
crout(waver->trace, url, -1);
546
cbmapiterinit(waver->seeds);
547
ulinks = cbmapopen();
548
while((kbuf = cbmapiternext(waver->seeds, NULL)) != NULL){
549
seedurldocs(waver, kbuf, 0, strtod(cbmapget(waver->seeds, kbuf, -1, NULL), NULL), ulinks);
552
kwords_reduce(waver->kwords, waver->seedkeynum, FALSE);
553
kwbuf = cbdatumopen(NULL, -1);
554
cbmapiterinit(waver->kwords);
555
for(i = 0; (kbuf = cbmapiternext(waver->kwords, NULL)) != NULL; i++){
556
if(i > 0) est_datum_printf(kwbuf, ", ");
557
est_datum_printf(kwbuf, "%s (%s)", kbuf, cbmapget(waver->kwords, kbuf, -1, NULL));
559
log_print(LL_DEBUG, "seed keywords: %s", cbdatumptr(kwbuf));
563
while(!g_sigterm && !ended && (tmp = dequeue(waver, &depth, &pid, &psim)) != NULL){
564
if(endurl && !strcmp(tmp, endurl)){
565
est_usleep(SLEEPUSEC);
566
if(mode == CM_REVISIT){
569
log_print(LL_INFO, "waiting for threads: %d", accessthnum(0));
571
while(accessthnum(0) > 0 && time(NULL) < t + waver->timeout * 2 + 1){
572
est_usleep(SLEEPUSEC);
579
if(pthread_mutex_lock(&g_mutex) == 0){
580
if((rec = crget(waver->trace, tmp, -1, 0, -1, NULL)) != NULL){
581
mdate = (time_t)strtod(rec, NULL);
584
pthread_mutex_unlock(&g_mutex);
585
if(mdate + waver->revisit >= time(NULL)){
586
log_print(LL_DEBUG, "not modified: %s", tmp);
591
log_print(LL_ERROR, "could not get mutex");
593
if(cbmaprnum(waver->nodes) > 0 &&
594
(waver->curnode < 1 || waver->minload >= 1.0 || thid % NODERTTNUM == 0 ||
595
(thid % (NODERTTNUM / 10) == 0 && waver_current_node_load(waver) > 0.85))){
596
waver_set_current_node(waver);
597
log_print(LL_INFO, "current node changed: %d: %f", waver->curnode, waver->minload);
600
targs = cbmalloc(sizeof(TARGSURL));
602
targs->waver = waver;
604
targs->depth = depth;
607
targs->mdate = mdate;
608
if(waver->thnum > 1){
609
while(accessthnum(0) >= waver->thnum){
610
est_usleep(SLEEPUSEC);
612
if(pthread_create(&th, NULL, geturldoc, targs) == 0){
614
if(thid <= waver->thnum) est_usleep(SLEEPUSEC);
622
locked = pthread_mutex_lock(&g_mutex) == 0;
623
log_print(LL_INFO, "status: dnum=%d, wnum=%d, size=%.0f, queue=%d",
624
est_mtdb_doc_num(waver->index), est_mtdb_word_num(waver->index),
625
est_mtdb_size(waver->index), queue_rnum(waver->queue) + 1);
626
if(locked) pthread_mutex_unlock(&g_mutex);
627
est_usleep(SLEEPUSEC);
630
est_usleep(SLEEPUSEC);
631
if(waver->thnum > 1){
632
log_print(LL_INFO, "waiting for threads: %d", accessthnum(0));
634
while(accessthnum(0) > 0){
635
if(time(NULL) > t + waver->timeout * 8 + 60){
636
log_print(LL_WARN, "thread waiting timed out: %d", accessthnum(0));
638
est_usleep(1000 * 1000 * 5);
641
est_usleep(SLEEPUSEC);
645
log_print(LL_INFO, "crawling finished");
646
locked = pthread_mutex_lock(&g_mutex) == 0;
648
if(!waver_close(waver)){
649
log_print(LL_ERROR, "%s: closing failed", rootdir);
652
if(locked) pthread_mutex_unlock(&g_mutex);
653
if(!err) log_print(LL_INFO, "finished successfully");
658
/* perform the unittest command */
659
static int procunittest(const char *rootdir){
662
CBMAP *seeds, *kwords;
663
char uri[URIBUFSIZ], *vbuf;
665
log_print(LL_INFO, "initializing the waver handle");
666
if(!waver_init(rootdir, 0)){
667
log_print(LL_ERROR, "%s: initializing failed", rootdir);
670
log_print(LL_INFO, "opening the waver handle");
671
if(!(waver = waver_open(rootdir))){
672
log_print(LL_ERROR, "%s: opening failed", rootdir);
676
log_print(LL_INFO, "checking seeding");
677
seeds = waver->seeds;
678
for(i = 0; i < 100; i++){
679
sprintf(uri, "http://%05d/%x/%x.html",
680
i + 1, (int)(est_random() * 0x1000000), (int)(est_random() * 0x1000000));
681
cbmapput(seeds, uri, -1, "", 0, TRUE);
683
log_print(LL_INFO, "checking priority queue");
684
queue = waver->queue;
685
for(i = 0; i < 100; i++){
686
sprintf(uri, "0:0:0.0\thttp://%05d/%x/%x.html",
687
i + 1, (int)(est_random() * 0x1000000), (int)(est_random() * 0x1000000));
688
if(!queue_enqueue(queue, uri, est_random())){
692
if(i % 10 == 0) queue_set_range(queue, i);
694
if(queue_rnum(queue) != 100) err = TRUE;
695
if(err) log_print(LL_ERROR, "%s: enqueue failed", rootdir);
696
if(!queue_slim(queue, 60)) err = TRUE;
697
if(err) log_print(LL_ERROR, "%s: slim failed", rootdir);
698
for(i = 0; (vbuf = queue_dequeue(queue)) != NULL; i++){
703
log_print(LL_ERROR, "%s: dequeue failed", rootdir);
705
log_print(LL_INFO, "checking keyword map");
706
kwords = waver->kwords;
707
for(i = 0; i < 10000; i++){
708
sprintf(uri, "%d", (int)(est_random() * 1000));
709
kwords_add(kwords, uri, (int)(est_random() * 1000));
711
kwords_reduce(kwords, 100, TRUE);
712
log_print(LL_INFO, "closing the waver handle");
713
if(!waver_close(waver)){
714
log_print(LL_ERROR, "%s: closing failed", rootdir);
717
if(!err) log_print(LL_INFO, "finished successfully");
722
/* perform the fetch command */
723
static int procfetch(const char *url){
728
const char *border, *vbuf;
731
raw = cbdatumopen(NULL, -1);
733
links = cblistopen();
735
if(!fetch_document(url, g_pxhost, g_pxport, g_timeout, -1, NULL, NULL, &code, raw, heads,
736
links, NULL, doc, g_inputlang)){
737
log_print(LL_WARN, "could not get: %d: %s", code, url);
744
border = est_border_str();
745
printf("URL: %s\r\n", url);
746
str = cbdatestrhttp(time(NULL), 0);
747
printf("Date: %s\r\n", str);
749
str = cbmimeencode((vbuf = est_doc_attr(doc, ESTDATTRTITLE)) ? vbuf : url, "UTF-8", TRUE);
750
printf("Subject: [estwaver] %s\r\n", str);
752
printf("Content-Type: multipart/mixed; boundary=%s\r\n", border);
754
printf("This is a multi-part message in MIME format.\n");
756
printf("--%s\r\n", border);
757
printf("Content-Type: text/x-estraier-draft\r\n");
758
printf("X-Estwaver-Role: draft\r\n");
760
str = est_doc_dump_draft(doc);
764
printf("--%s\r\n", border);
765
printf("Content-Type: text/plain\r\n");
766
printf("X-Estwaver-Role: links\r\n");
768
for(i = 0; i < cblistnum(links); i++){
769
printf("%s\n", cblistval(links, i, NULL));
772
printf("--%s\r\n", border);
773
cbmapiterinit(heads);
774
while((vbuf = cbmapiternext(heads, &vsiz)) != NULL){
776
printf("X-Original-HTTP-Response: %s\r\n", cbmapget(heads, vbuf, vsiz, NULL));
777
} else if(!strcmp(vbuf, "content-encoding")){
778
printf("X-Original-Content-Encoding: %s\r\n", cbmapget(heads, vbuf, vsiz, NULL));
780
str = capitalize(vbuf);
781
printf("%s: %s\r\n", str, cbmapget(heads, vbuf, vsiz, NULL));
785
printf("X-Estwaver-Role: raw\r\n");
787
fwrite(cbdatumptr(raw), 1, cbdatumsize(raw), stdout);
789
printf("--%s--\r\n", border);
798
/* get the language value */
799
static int strtolang(const char *str){
800
if(!cbstricmp(str, "en")) return ESTLANGEN;
801
if(!cbstricmp(str, "ja")) return ESTLANGJA;
802
if(!cbstricmp(str, "zh")) return ESTLANGZH;
803
if(!cbstricmp(str, "ko")) return ESTLANGKO;
808
/* make a capitalized string */
809
static char *capitalize(const char *str){
812
buf = cbmemdup(str, -1);
814
for(i = 0; buf[i] != '\0'; i++){
815
if(cap && buf[i] >= 'a' && buf[i] <= 'z'){
818
cap = buf[i] == ' ' || buf[i] == '-';
824
/* get keywords of a seed document */
825
static void seedurldocs(WAVER *waver, const char *url, int depth, double bias, CBMAP *ulinks){
832
int i, j, code, ksiz, num, len, allow;
834
if(g_sigterm) return;
835
ubuf = cbsprintf("%d:0:1.0\t%s", depth, url);
836
queue_enqueue(waver->queue, ubuf, depth * MINPRIOR);
838
links = cblistopen();
840
log_print(LL_INFO, "fetching: %d: %s", depth, url);
841
fetch_document(url, waver->pxhost, waver->pxport, waver->timeout * 2, 0,
842
waver->urlrules, waver->mtrules, &code, NULL, NULL, links,
843
waver->unrules, doc, waver->language);
844
if(code == 200 && !est_doc_is_empty(doc)){
845
log_print(LL_INFO, "seeding: %.3f: %s", bias, url);
846
kwords = est_morph_etch_doc(doc, waver->seedkeynum);
847
cbmapiterinit(kwords);
848
while((kbuf = cbmapiternext(kwords, &ksiz)) != NULL){
849
num = atoi(cbmapget(kwords, kbuf, ksiz, NULL)) * bias;
850
if(num > 0) kwords_add(waver->kwords, kbuf, num);
852
if(depth < waver->seeddepth && bias > 0.0){
853
lnumtune = pow(cblistnum(links) + 2, 0.5);
854
for(i = 0; i < cblistnum(links); i++){
855
ubuf = cbmemdup(cblistval(links, i, NULL), -1);
856
if((pv = strchr(ubuf, '#')) != NULL) *pv = '\0';
858
if(len > 1024 || cbmapget(ulinks, ubuf, len, NULL) ||
859
cbmapget(waver->seeds, ubuf, len, NULL)){
863
cbmapput(ulinks, ubuf, -1, "", 0, FALSE);
865
for(j = 0; j < cblistnum(waver->pmrules); j++){
866
pmrule = (PMRULE *)cblistval(waver->pmrules, j, NULL);
867
switch(pmrule->visit){
869
if(est_regex_match(pmrule->regex, ubuf)) allow = TRUE;
872
if(est_regex_match(pmrule->regex, ubuf)) allow = FALSE;
876
if(allow) seedurldocs(waver, ubuf, depth + 1, bias / lnumtune, ulinks);
882
log_print(LL_INFO, "ignored: %d: %s", code, url);
889
/* access the number of threads */
890
static int accessthnum(int inc){
891
static pthread_mutex_t mymutex = PTHREAD_MUTEX_INITIALIZER;
893
if(pthread_mutex_lock(&mymutex) != 0){
894
log_print(LL_ERROR, "could not get mutex");
899
pthread_mutex_unlock(&mymutex);
904
/* put the current task */
905
static int puttask(const char *url){
906
if(pthread_mutex_lock(&g_mutex) != 0){
907
log_print(LL_ERROR, "could not get mutex");
910
if(!cbmapput(g_tasks, url, -1, "", 0, FALSE)){
911
pthread_mutex_unlock(&g_mutex);
914
pthread_mutex_unlock(&g_mutex);
919
/* put the current task */
920
static void outtask(const char *url){
921
if(pthread_mutex_lock(&g_mutex) != 0){
922
log_print(LL_ERROR, "could not get mutex");
925
cbmapout(g_tasks, url, -1);
926
pthread_mutex_unlock(&g_mutex);
931
/* get a document of URL */
932
static void *geturldoc(void *args){
936
CBMAP *heads, *kwords;
940
char *url, *rec, *dpath, *epath;
941
char numbuf[NUMBUFSIZ], *tmp;
942
int i, thid, id, alive, code, allow;
947
waver = myargs->waver;
950
log_print(LL_DEBUG, "[%d]: early collision: %s", thid, url);
956
if(waver->interval > 0) est_usleep(waver->interval * 1000);
958
raw = cbdatumopen(NULL, -1);
960
links = cblistopen();
962
log_print(LL_INFO, "[%d]: fetching: %d: %s", thid, myargs->depth, url);
963
fetch_document(url, waver->pxhost, waver->pxport, waver->timeout, myargs->mdate,
964
waver->urlrules, waver->mtrules, &code, raw, heads, links,
965
waver->unrules, doc, waver->language);
967
log_print(LL_WARN, "[%d]: terminated: %s", thid, url);
968
if(pthread_mutex_lock(&g_mutex) == 0){
969
tmp = cbsprintf("%d:%d:%.5f\t%s", myargs->depth, myargs->pid, myargs->psim, url);
970
queue_enqueue(waver->queue, tmp, MINPRIOR);
972
pthread_mutex_unlock(&g_mutex);
984
if(pthread_mutex_lock(&g_mutex) == 0){
986
if((rec = crget(waver->trace, url, -1, 0, -1, NULL)) != NULL){
987
if(strtod(rec, NULL) + waver->revisit >= now){
988
log_print(LL_DEBUG, "[%d]: late collision: %s", thid, url);
990
pthread_mutex_unlock(&g_mutex);
1003
sprintf(numbuf, "%.0f:%d:%d:%.5f#%d",
1004
now, myargs->depth, myargs->pid, myargs->psim, waver->curnode);
1005
crput(waver->trace, url, -1, numbuf, -1, CR_DOVER);
1006
if(code == 200 && est_doc_attr(doc, ESTDATTRURI)){
1008
est_doc_slim(doc, waver->textlimit);
1009
kwords = est_morph_etch_doc(doc, waver->seedkeynum);
1012
for(i = 0; i < cblistnum(waver->pmrules); i++){
1013
pmrule = (PMRULE *)cblistval(waver->pmrules, i, NULL);
1014
switch(pmrule->index){
1016
if(est_regex_match(pmrule->regex, url)) allow = TRUE;
1019
if(est_regex_match(pmrule->regex, url)) allow = FALSE;
1023
if(allow && !est_doc_is_empty(doc)){
1024
est_doc_set_keywords(doc, kwords);
1025
kwords_reduce(est_doc_keywords(doc), waver->savekeynum, FALSE);
1026
if(waver->curnode > 0){
1027
if(waver_node_put_doc(waver, doc, &code)){
1028
log_print(LL_DEBUG, "[%d]: registered: %s", thid, url);
1031
log_print(LL_ERROR, "[%d]: registration failed: %s: %d", thid, url, code);
1034
if(est_mtdb_put_doc(waver->index, doc, ESTPDCLEAN)){
1035
log_print(LL_DEBUG, "[%d]: registered: %s", thid, url);
1036
id = est_doc_id(doc);
1039
log_print(LL_ERROR, "[%d]: registration failed: %s: %s",
1040
thid, url, dperrmsg(dpecode));
1044
log_print(LL_DEBUG, "[%d]: not to be indexed: %s", thid, url);
1046
enqueuelinks(waver, url, links, kwords, myargs->depth, id, myargs->pid, myargs->psim);
1048
} else if(cblistnum(links) > 0){
1049
enqueuelinks(waver, url, links, NULL,
1050
myargs->depth, myargs->pid, myargs->pid, myargs->psim);
1051
log_print(LL_INFO, "[%d]: redirected: %d: %s", thid, code, url);
1053
log_print(LL_INFO, "[%d]: ignored: %d: %s", thid, code, url);
1055
if(!alive && code != 304){
1056
if(cbmaprnum(waver->nodes) > 0){
1057
if(waver_node_out_doc(waver, url, &code)){
1058
log_print(LL_DEBUG, "[%d]: deleted: %s", thid, url);
1060
if(code != 400) log_print(LL_ERROR, "[%d]: deletion failed: %s: %d", thid, url, code);
1062
} else if((id = est_mtdb_uri_to_id(waver->index, url)) > 0){
1063
if(est_mtdb_out_doc(waver->index, id, ESTODCLEAN)){
1064
log_print(LL_DEBUG, "[%d]: deleted: %s", thid, url);
1066
log_print(LL_ERROR, "[%d]: deletion failed: %s: %s",
1067
thid, url, dperrmsg(dpecode));
1071
pthread_mutex_unlock(&g_mutex);
1073
log_print(LL_ERROR, "[%d]: could not get mutex", thid);
1078
if(waver->postproc){
1079
dpath = cbsprintf("%s%c%s%c%08d.est",
1080
waver->rootdir, ESTPATHCHR, MYTMPDIR, ESTPATHCHR, thid);
1081
epath = cbsprintf("%s%c%s%c%08d.dat",
1082
waver->rootdir, ESTPATHCHR, MYTMPDIR, ESTPATHCHR, thid);
1084
if(waver->draftdir){
1086
dpath = urltosavepath(waver->draftdir, url);
1088
if(waver->entitydir){
1090
epath = urltosavepath(waver->entitydir, url);
1093
log_print(LL_DEBUG, "[%d]: saving: %s", thid, dpath);
1094
tmp = est_doc_dump_draft(doc);
1095
if(!cbwritefile(dpath, tmp, -1))
1096
log_print(LL_ERROR, "[%d]: saving failed: %s", thid, dpath);
1100
log_print(LL_DEBUG, "[%d]: saving: %s", thid, epath);
1101
if(!cbwritefile(epath, cbdatumptr(raw), cbdatumsize(raw)))
1102
log_print(LL_ERROR, "[%d]: saving failed: %s", thid, epath);
1104
if(waver->postproc){
1105
tmp = cbsprintf("%s \"%s\" \"%s\"", waver->postproc, dpath, epath);
1109
if(epath && !waver->entitydir) unlink(epath);
1110
if(dpath && !waver->draftdir) unlink(dpath);
1114
est_doc_delete(doc);
1126
/* get the saving path of a URL */
1127
static char *urltosavepath(const char *savedir, const char *url){
1132
if((rp = strstr(url, "://")) != NULL) url = rp + 3;
1133
buf = cbdatumopen(NULL, -1);
1134
elems = cbsplit(url, -1, "/");
1135
if(cbstrbwmatch(url, "/")) cblistpush(elems, "index.html", -1);
1136
est_datum_printf(buf, "%s", savedir);
1137
for(i = 0; i < cblistnum(elems); i++){
1138
rp = cblistval(elems, i, NULL);
1139
if(rp[0] == '\0') continue;
1140
est_mkdir(cbdatumptr(buf));
1141
unlink(cbdatumptr(buf));
1142
est_datum_printf(buf, "%c%?", ESTPATHCHR, rp);
1145
return cbdatumtomalloc(buf, NULL);