~ubuntu-branches/ubuntu/karmic/gtk-gnutella/karmic

« back to all changes in this revision

Viewing changes to src/core/dq.c

  • Committer: Bazaar Package Importer
  • Author(s): Anand Kumria
  • Date: 2005-08-04 11:32:05 UTC
  • mfrom: (1.2.1 upstream) (2.1.1 sarge)
  • Revision ID: james.westby@ubuntu.com-20050804113205-q746i4lgo3rtlegn
Tags: 0.95.4-1
New upstream release.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
 * $Id: dq.c,v 1.25 2005/06/29 14:24:22 daichik Exp $
 
3
 *
 
4
 * Copyright (c) 2004, Raphael Manfredi
 
5
 *
 
6
 *----------------------------------------------------------------------
 
7
 * This file is part of gtk-gnutella.
 
8
 *
 
9
 *  gtk-gnutella is free software; you can redistribute it and/or modify
 
10
 *  it under the terms of the GNU General Public License as published by
 
11
 *  the Free Software Foundation; either version 2 of the License, or
 
12
 *  (at your option) any later version.
 
13
 *
 
14
 *  gtk-gnutella is distributed in the hope that it will be useful,
 
15
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
16
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
17
 *  GNU General Public License for more details.
 
18
 *
 
19
 *  You should have received a copy of the GNU General Public License
 
20
 *  along with gtk-gnutella; if not, write to the Free Software
 
21
 *  Foundation, Inc.:
 
22
 *      59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
23
 *----------------------------------------------------------------------
 
24
 */
 
25
 
 
26
/**
 
27
 * @ingroup core
 
28
 * @file
 
29
 *
 
30
 * Dynamic querying.
 
31
 *
 
32
 * @author Raphael Manfredi
 
33
 * @date 2004
 
34
 */
 
35
 
 
36
#include "common.h"
 
37
 
 
38
RCSID("$Id: dq.c,v 1.25 2005/06/29 14:24:22 daichik Exp $");
 
39
 
 
40
#include <math.h>                       /* For pow() */
 
41
 
 
42
#include "dq.h"
 
43
#include "mq.h"
 
44
#include "gmsg.h"
 
45
#include "pmsg.h"
 
46
#include "gmsg.h"
 
47
#include "nodes.h"
 
48
#include "gnet_stats.h"
 
49
#include "qrp.h"
 
50
#include "vmsg.h"
 
51
#include "search.h"
 
52
#include "alive.h"
 
53
 
 
54
#include "if/gnet_property_priv.h"
 
55
 
 
56
#include "lib/atoms.h"
 
57
#include "lib/cq.h"
 
58
#include "lib/endian.h"
 
59
#include "lib/glib-missing.h"
 
60
#include "lib/misc.h"
 
61
#include "lib/walloc.h"
 
62
 
 
63
#include "lib/override.h"                       /* Must be the last header included */
 
64
 
 
65
#define DQ_MAX_LIFETIME         300000  /**< 5 minutes, in ms */
 
66
#define DQ_PROBE_TIMEOUT        1500    /**< 1.5 s extra per connection */
 
67
#define DQ_PENDING_TIMEOUT      1200    /**< 1.2 s extra per pending message */
 
68
#define DQ_QUERY_TIMEOUT        3700    /**< 3.7 s */
 
69
#define DQ_TIMEOUT_ADJUST       100             /**< 100 ms at each connection */
 
70
#define DQ_MIN_TIMEOUT          1500    /**< 1.5 s at least between queries */
 
71
#define DQ_LINGER_TIMEOUT       120000  /**< 2 minutes, in ms */
 
72
#define DQ_STATUS_TIMEOUT       30000   /**< 30 s, in ms, to reply to query status */
 
73
#define DQ_MAX_PENDING          3               /**< Max pending queries we allow */
 
74
 
 
75
#define DQ_LEAF_RESULTS         50              /**< # of results targetted for leaves */
 
76
#define DQ_LOCAL_RESULTS        150             /**< # of results for local queries */
 
77
#define DQ_SHA1_DECIMATOR       25              /**< Divide expected by that much for SHA1 */
 
78
#define DQ_PROBE_UP                     3               /**< Amount of UPs for initial probe */
 
79
#define DQ_MAX_HORIZON          500000  /**< Stop search after that many UP queried */
 
80
#define DQ_MIN_HORIZON          3000    /**< Min horizon before timeout adjustment */
 
81
#define DQ_LOW_RESULTS          10              /**< After DQ_MIN_HORIZON queried for adj. */
 
82
#define DQ_PERCENT_KEPT         5               /**< Assume 5% of results kept, worst case */
 
83
 
 
84
#define DQ_MAX_TTL                      5               /**< Max TTL we can use */
 
85
#define DQ_AVG_ULTRA_NODES      2               /**< Average # of ultranodes a leaf queries */
 
86
 
 
87
#define DQ_MQ_EPSILON           2048    /**< Queues identical at +/- 2K */
 
88
#define DQ_FUZZY_FACTOR         0.80    /**< Corrector for theoretical horizon */
 
89
 
 
90
/**
 
91
 * Compute start of search string (which is NUL terminated) in query.
 
92
 * The "+2" skips the "speed" field in the query.
 
93
 */
 
94
#define QUERY_TEXT(m)   ((m) + sizeof(struct gnutella_header) + 2)
 
95
 
 
96
/**
 
97
 * The dynamic query.
 
98
 */
 
99
typedef struct dquery {
 
100
        guint32 qid;                            /**< Unique query ID, to detect ghosts */
 
101
        guint32 node_id;                        /**< ID of the node that originated the query */
 
102
        guint32 flags;                          /**< Operational flags */
 
103
        gnet_search_t sh;                       /**< Search handle, if node ID = NODE_ID_LOCAL */
 
104
        pmsg_t *mb;                                     /**< The search messsage "template" */
 
105
        query_hashvec_t *qhv;           /**< Query hash vector for QRP filtering */
 
106
        GHashTable *queried;            /**< Contains node IDs that we queried so far */
 
107
        guint8 ttl;                                     /**< Initial query TTL */
 
108
        guint32 horizon;                        /**< Theoretical horizon reached thus far */
 
109
        guint32 up_sent;                        /**< # of UPs to which we really sent our query */
 
110
        guint32 pending;                        /**< Pending query messages not ACK'ed yet by mq */
 
111
        guint32 max_results;            /**< Max results we're targetting for */
 
112
        guint32 fin_results;            /**< # of results terminating leaf-guided query */
 
113
        guint32 oob_results;            /**< Amount of unclaimed OOB results reported */
 
114
        guint32 results;                        /**< Results we got so far for the query */
 
115
        guint32 linger_results;         /**< Results we got whilst lingering */
 
116
        guint32 new_results;            /**< New we got since last query status request */
 
117
        guint32 kept_results;           /**< Results they say they kept after filtering */
 
118
        guint32 result_timeout;         /**< The current timeout for getting results */
 
119
        gpointer expire_ev;                     /**< Callout queue global expiration event */
 
120
        gpointer results_ev;            /**< Callout queue results expiration event */
 
121
        gpointer alive;                         /**< Alive ping stats for computing timeouts */
 
122
        time_t start;                           /**< Time at which it started */
 
123
        time_t stop;                            /**< Time at which it was terminated */
 
124
        pmsg_t *by_ttl[DQ_MAX_TTL];     /**< Copied mesages, one for each TTL */
 
125
} dquery_t;
 
126
 
 
127
#define DQ_F_ID_CLEANING        0x00000001      /**< Cleaning the `by_node_id' table */
 
128
#define DQ_F_LINGER                     0x00000002      /**< Lingering to monitor extra results */
 
129
#define DQ_F_LEAF_GUIDED        0x00000004      /**< Leaf-guided query */
 
130
#define DQ_F_WAITING            0x00000008      /**< Waiting guidance reply from leaf */
 
131
#define DQ_F_GOT_GUIDANCE       0x00000010      /**< Got unsolicited leaf guidance */
 
132
#define DQ_F_USR_CANCELLED      0x00000020      /**< Explicitely cancelled by user */
 
133
#define DQ_F_EXITING            0x80000000      /**< Final cleanup at exit time */
 
134
 
 
135
/**
 
136
 * This table keeps track of all the dynamic query objects that we have
 
137
 * created and which are alive.
 
138
 */
 
139
static GHashTable *dqueries = NULL;
 
140
 
 
141
/**
 
142
 * This table keeps track of all the dynamic query objects created
 
143
 * for a given node ID.  The key is the node ID (converted to a pointer) and
 
144
 * the value is a GSList containing all the queries for that node.
 
145
 */
 
146
static GHashTable *by_node_id = NULL;
 
147
 
 
148
/**
 
149
 * This table keeps track of the association between a MUID and the
 
150
 * dynamic query, so that when results come back, we may account them
 
151
 * for the relevant query.
 
152
 *
 
153
 * The keys are MUIDs (GUID atoms), the values are the dquery_t object.
 
154
 */
 
155
static GHashTable *by_muid = NULL;
 
156
 
 
157
/**
 
158
 * Information about query messages sent.
 
159
 *
 
160
 * We can't really add too many fields to the pmsg_t blocks we enqueue.
 
161
 * However, what we do is we extend the pmsg_t to enrich them with a free
 
162
 * routine, and we use that fact to be notified by the message queue when
 
163
 * a message is freed.  We can then probe into the flags to see whether
 
164
 * it was sent.
 
165
 *
 
166
 * But adding a free routine is about as much as we can do with a generic
 
167
 * message system.  To be able to keep track of more information about the
 
168
 * queries we send, we associate each message with a structure containing
 
169
 * meta-information about it.
 
170
 */
 
171
struct pmsg_info {
 
172
        dquery_t *dq;                   /**< The dynamic query that sent the query */
 
173
        guint32 qid;                    /**< Query ID of the dynamic query */
 
174
        guint32 node_id;                /**< The ID of the node we sent it to */
 
175
        guint16 degree;                 /**< The advertised degree of the destination node */
 
176
        guint8 ttl;                             /**< The TTL used for that query */
 
177
};
 
178
 
 
179
/**
 
180
 * Structure produced by dq_fill_next_up, representing the nodes to which
 
181
 * we could send the query, along with routing information to be able to favor
 
182
 * UPs that report a QRP match early in the querying process.
 
183
 */
 
184
struct next_up {
 
185
        gnutella_node_t *node;  /**< Selected node */
 
186
        query_hashvec_t *qhv;   /**< Query hash vector for the query */
 
187
        gint can_route;                 /**< -1 = unknown, otherwise TRUE / FALSE */
 
188
        gint queue_pending;             /**< -1 = unknown, otherwise cached queue size */
 
189
};
 
190
 
 
191
/*
 
192
 * This table stores the pre-compution:
 
193
 *
 
194
 *  hosts(degree,ttl) = Sum[(degree-1)^i, 0 <= i <= ttl-1]
 
195
 *
 
196
 * For degree = 1 to 40 and ttl = 1 to 5.
 
197
 */
 
198
 
 
199
#define MAX_DEGREE              50
 
200
#define MAX_TTL                 5
 
201
 
 
202
static guint32 hosts[MAX_DEGREE][MAX_TTL];      /**< Pre-computed horizon */
 
203
 
 
204
static guint32 dyn_query_id = 0;
 
205
 
 
206
static void dq_send_next(dquery_t *dq);
 
207
static void dq_terminate(dquery_t *dq);
 
208
 
 
209
/**
 
210
 * Compute the hosts[] table so that:
 
211
 *
 
212
 *  hosts[i][j] = Sum[i^k, 0 <= k <= j]
 
213
 *
 
214
 * following the formula:
 
215
 *
 
216
 *  hosts(degree,ttl) = Sum[(degree-1)^i, 0 <= i <= ttl-1]
 
217
 */
 
218
static void
 
219
fill_hosts(void)
 
220
{
 
221
        gint i;
 
222
        gint j;
 
223
 
 
224
        for (i = 0; i < MAX_DEGREE; i++) {
 
225
                hosts[i][0] = 1;
 
226
                for (j = 1; j < MAX_TTL; j++) {
 
227
                        hosts[i][j] = hosts[i][j-1] + pow(i, j);
 
228
 
 
229
                        if (dq_debug > 19)
 
230
                                printf("horizon(degree=%d, ttl=%d) = %d\n",
 
231
                                        i+1, j+1, hosts[i][j]);
 
232
                }
 
233
        }
 
234
}
 
235
 
 
236
/**
 
237
 * Computes theoretical horizon reached by a query sent to a host advertising
 
238
 * a given degree if it is going to travel ttl hops.
 
239
 *
 
240
 * We adjust the horizon by DQ_FUZZY_FACTOR, assuming that at each hop there
 
241
 * is deperdition due to flow-control, network cycles, etc...
 
242
 */
 
243
static guint32
 
244
dq_get_horizon(gint degree, gint ttl)
 
245
{
 
246
        gint i;
 
247
        gint j;
 
248
 
 
249
        g_assert(degree > 0);
 
250
        g_assert(ttl > 0);
 
251
 
 
252
        i = MIN(degree, MAX_DEGREE) - 1;
 
253
        j = MIN(ttl, MAX_TTL) - 1;
 
254
 
 
255
        return hosts[i][j] * pow(DQ_FUZZY_FACTOR, j);
 
256
}
 
257
 
 
258
/**
 
259
 * Compute amount of results "kept" for the query, if we have this
 
260
 * information available.
 
261
 */
 
262
static guint32
 
263
dq_kept_results(dquery_t *dq)
 
264
{
 
265
        /*
 
266
         * For local queries, see how many results we kept so far.
 
267
         *
 
268
         * Since there's no notification for local queries about the
 
269
         * amount of results kept (no "Query Status Results" messages)
 
270
         * update the amount now.
 
271
         */
 
272
 
 
273
        if (dq->node_id == NODE_ID_LOCAL)
 
274
                return dq->kept_results = search_get_kept_results_by_handle(dq->sh);
 
275
 
 
276
        /*
 
277
         * We artificially reduce the kept results by a factor of
 
278
         * DQ_AVG_ULTRA_NODES since the leaf node will report the total
 
279
         * number of hits it got and kept from the other ultrapeers it is
 
280
         * querying, and we assume it filtered out about the same proportion
 
281
         * of hits everywhere.
 
282
         */
 
283
 
 
284
        return (dq->flags & DQ_F_LEAF_GUIDED) ?
 
285
                (dq->kept_results / DQ_AVG_ULTRA_NODES) : dq->results;
 
286
}
 
287
 
 
288
/**
 
289
 * Select the proper TTL for the next query we're going to send to the
 
290
 * specified node, assuming hosts are equally split among the remaining
 
291
 * connections we have yet to query.
 
292
 */
 
293
static gint
 
294
dq_select_ttl(dquery_t *dq, gnutella_node_t *node, gint connections)
 
295
{
 
296
        guint32 needed;
 
297
        guint32 results;
 
298
        gdouble results_per_up;
 
299
        gdouble hosts_to_reach;
 
300
        gdouble hosts_to_reach_via_node;
 
301
        gint ttl;
 
302
 
 
303
        g_assert(connections > 0);
 
304
 
 
305
        results = dq_kept_results(dq);
 
306
        needed = dq->max_results - results;
 
307
 
 
308
        g_assert(needed > 0);           /* Or query would have been stopped */
 
309
 
 
310
        results_per_up = dq->results / MAX(dq->horizon, 1);
 
311
        hosts_to_reach = (gdouble) needed / MAX(results_per_up, (gdouble) 0.000001);
 
312
        hosts_to_reach_via_node = hosts_to_reach / (gdouble) connections;
 
313
 
 
314
        /*
 
315
         * Now iteratively find the TTL needed to reach the desired number
 
316
         * of hosts, rounded to the lowest TTL to be conservative.
 
317
         */
 
318
 
 
319
        for (ttl = MIN(node->max_ttl, dq->ttl); ttl > 0; ttl--) {
 
320
                if (dq_get_horizon(node->degree, ttl) <= hosts_to_reach_via_node)
 
321
                        break;
 
322
        }
 
323
 
 
324
        if (ttl == 0)
 
325
                ttl = MIN(node->max_ttl, dq->ttl);
 
326
 
 
327
        g_assert(ttl > 0);
 
328
 
 
329
        return ttl;
 
330
}
 
331
 
 
332
/**
 
333
 * Create a pmsg_info structure, giving meta-information about the message
 
334
 * we're about to send.
 
335
 *
 
336
 * @param dq      DOCUMENT THIS!
 
337
 * @param degree  the degree of the node to which the message is sent
 
338
 * @param ttl     the TTL at which the message is sent
 
339
 * @param node_id the ID of the node to which we send the message
 
340
 */
 
341
static struct pmsg_info *
 
342
dq_pmi_alloc(dquery_t *dq, guint16 degree, guint8 ttl, guint32 node_id)
 
343
{
 
344
        struct pmsg_info *pmi;
 
345
 
 
346
        pmi = walloc(sizeof(*pmi));
 
347
 
 
348
        pmi->dq = dq;
 
349
        pmi->qid = dq->qid;
 
350
        pmi->degree = degree;
 
351
        pmi->ttl = ttl;
 
352
        pmi->node_id = node_id;
 
353
 
 
354
        return pmi;
 
355
}
 
356
 
 
357
/**
 
358
 * Get rid of the pmsg_info structure.
 
359
 */
 
360
static void
 
361
dq_pmi_free(struct pmsg_info *pmi)
 
362
{
 
363
        wfree(pmi, sizeof(*pmi));
 
364
}
 
365
 
 
366
/**
 
367
 * Check whether query bearing the specified ID is still alive and has
 
368
 * not been cancelled yet.
 
369
 */
 
370
static gboolean
 
371
dq_alive(dquery_t *dq, guint32 qid)
 
372
{
 
373
        if (!g_hash_table_lookup(dqueries, dq))
 
374
                return FALSE;
 
375
 
 
376
        return dq->qid == qid;          /* In case it reused the same address */
 
377
}
 
378
 
 
379
/**
 
380
 * Free routine for an extended message block.
 
381
 */
 
382
static void
 
383
dq_pmsg_free(pmsg_t *mb, gpointer arg)
 
384
{
 
385
        struct pmsg_info *pmi = (struct pmsg_info *) arg;
 
386
        dquery_t *dq = pmi->dq;
 
387
 
 
388
        g_assert(pmsg_is_extended(mb));
 
389
 
 
390
        /*
 
391
         * It is possible that whilst the message was in the message queue,
 
392
         * the dynamic query was cancelled.  Therefore, we need to ensure that
 
393
         * the recorded query is still alive.  We use both the combination of
 
394
         * a hash table and a unique ID in case the address of an old dquery_t
 
395
         * object is reused later.
 
396
         */
 
397
 
 
398
        if (!dq_alive(dq, pmi->qid))
 
399
                goto cleanup;
 
400
 
 
401
        g_assert(dq->pending > 0);
 
402
 
 
403
        dq->pending--;
 
404
 
 
405
        if (!pmsg_was_sent(mb)) {
 
406
                gpointer key;
 
407
                gpointer value;
 
408
                gboolean found;
 
409
 
 
410
                /*
 
411
                 * The message was not sent: we need to remove the entry for the
 
412
                 * node in the "dq->queried" structure, since the message did not
 
413
                 * make it through the network.
 
414
                 */
 
415
 
 
416
                found = g_hash_table_lookup_extended(dq->queried,
 
417
                                        GUINT_TO_POINTER(pmi->node_id), &key, &value);
 
418
 
 
419
                g_assert(found);                /* Or something is seriously corrupted */
 
420
 
 
421
                g_hash_table_remove(dq->queried, GUINT_TO_POINTER(pmi->node_id));
 
422
 
 
423
                if (dq_debug > 19)
 
424
                        printf("DQ[%d] %snode #%d degree=%d dropped message TTL=%d\n",
 
425
                                dq->qid, dq->node_id == NODE_ID_LOCAL ? "(local) " : "",
 
426
                                pmi->node_id, pmi->degree, pmi->ttl);
 
427
 
 
428
                /*
 
429
                 * If we don't have any more pending message and we're waiting
 
430
                 * for results, chances are we're going to wait for nothing!
 
431
                 *
 
432
                 * We can't re-enter mq from here, so reschedule the event for
 
433
                 * immediate delivery (in 1 ms, since we can't say 0).
 
434
                 */
 
435
 
 
436
                if (0 == dq->pending && dq->results_ev)
 
437
                        cq_resched(callout_queue, dq->results_ev, 1);
 
438
 
 
439
        } else {
 
440
                /*
 
441
                 * The message was sent.  Adjust the total horizon reached thus far.
 
442
                 * Record that this UP got the query.
 
443
                 */
 
444
 
 
445
                dq->horizon += dq_get_horizon(pmi->degree, pmi->ttl);
 
446
                dq->up_sent++;
 
447
 
 
448
                if (dq_debug > 19) {
 
449
                        printf("DQ[%d] %snode #%d degree=%d sent message TTL=%d\n",
 
450
                                dq->qid, dq->node_id == NODE_ID_LOCAL ? "(local) " : "",
 
451
                                pmi->node_id, pmi->degree, pmi->ttl);
 
452
                        printf("DQ[%d] %s(%d secs) queried %d UP%s, "
 
453
                                "horizon=%d, results=%d\n",
 
454
                                dq->qid, dq->node_id == NODE_ID_LOCAL ? "local " : "",
 
455
                                (gint) (time(NULL) - dq->start),
 
456
                                dq->up_sent, dq->up_sent == 1 ? "" :"s",
 
457
                                dq->horizon, dq->results);
 
458
                }
 
459
        }
 
460
 
 
461
cleanup:
 
462
        dq_pmi_free(pmi);
 
463
}
 
464
 
 
465
/**
 
466
 * Fetch message for a given TTL.
 
467
 * If no such message exists yet, create it from the "template" message.
 
468
 */
 
469
static pmsg_t *
 
470
dq_pmsg_by_ttl(dquery_t *dq, gint ttl)
 
471
{
 
472
        pmsg_t *mb;
 
473
        pmsg_t *t;
 
474
        pdata_t *db;
 
475
        gint len;
 
476
 
 
477
        g_assert(ttl > 0 && ttl <= DQ_MAX_TTL);
 
478
 
 
479
        mb = dq->by_ttl[ttl - 1];
 
480
        if (mb != NULL)
 
481
                return mb;
 
482
 
 
483
        /*
 
484
         * Copy does not exist for this TTL.
 
485
         *
 
486
         * First, create the data buffer, and copy the data from the
 
487
         * template to this new buffer.  We assume the original message
 
488
         * is made of one data buffer only (no data block chaining yet).
 
489
         */
 
490
 
 
491
        t = dq->mb;                                     /* Our "template" */
 
492
        len = pmsg_size(t);
 
493
        db = pdata_new(len);
 
494
        memcpy(pdata_start(db), pmsg_start(t), len);
 
495
 
 
496
        /*
 
497
         * Patch the TTL in the new data buffer.
 
498
         */
 
499
 
 
500
        ((struct gnutella_header *) pdata_start(db))->ttl = ttl;
 
501
 
 
502
        /*
 
503
         * Now create a message for this data buffer and save it for later perusal.
 
504
         */
 
505
 
 
506
        mb = pmsg_alloc(pmsg_prio(t), db, 0, len);
 
507
        dq->by_ttl[ttl - 1] = mb;
 
508
        gmsg_install_presend(mb);
 
509
 
 
510
        return mb;
 
511
}
 
512
 
 
513
/**
 
514
 * Fill node vector with UP hosts to which we could send our probe query.
 
515
 *
 
516
 * @param dq     DOCUMENT THIS!
 
517
 * @param nv     the pre-allocated node vector
 
518
 * @param ncount the size of the vector
 
519
 *
 
520
 * @return amount of nodes we found.
 
521
 */
 
522
static gint
 
523
dq_fill_probe_up(dquery_t *dq, gnutella_node_t **nv, gint ncount)
 
524
{
 
525
        const GSList *sl;
 
526
        gint i = 0;
 
527
 
 
528
        for (sl = node_all_nodes(); i < ncount && sl; sl = g_slist_next(sl)) {
 
529
                struct gnutella_node *n = (struct gnutella_node *) sl->data;
 
530
 
 
531
                if (!NODE_IS_ULTRA(n))
 
532
                        continue;
 
533
 
 
534
                /*
 
535
                 * Skip node if we haven't received the handshaking ping yet.
 
536
                 */
 
537
 
 
538
                if (n->received == 0)
 
539
                        continue;
 
540
 
 
541
                /*
 
542
                 * Skip node if we're in TX flow-control (query will likely not
 
543
                 * be transmitted before the next timeout, and it could even be
 
544
                 * dropped) or if we're remotely flow-controlled (no queries to
 
545
                 * be sent for now).
 
546
                 */
 
547
 
 
548
                if (NODE_IN_TX_FLOW_CONTROL(n) || n->hops_flow == 0)
 
549
                        continue;
 
550
 
 
551
                if (!qrp_node_can_route(n, dq->qhv))
 
552
                        continue;
 
553
 
 
554
                g_assert(NODE_IS_WRITABLE(n));  /* Checked by qrp_node_can_route() */
 
555
 
 
556
                nv[i++] = n;            /* Node or one of its leaves could answer */
 
557
        }
 
558
 
 
559
        return i;
 
560
}
 
561
 
 
562
/**
 
563
 * Fill node vector with UP hosts to which we could send our next query.
 
564
 *
 
565
 * @param dq DOCUMENT THIS!
 
566
 * @param nv the pre-allocated node vector
 
567
 * @param ncount the size of the vector
 
568
 *
 
569
 * @return amount of nodes we found.
 
570
 */
 
571
static gint
 
572
dq_fill_next_up(dquery_t *dq, struct next_up *nv, gint ncount)
 
573
{
 
574
        const GSList *sl;
 
575
        gint i = 0;
 
576
 
 
577
        for (sl = node_all_nodes(); i < ncount && sl; sl = g_slist_next(sl)) {
 
578
                struct gnutella_node *n = (struct gnutella_node *) sl->data;
 
579
                struct next_up *nup;
 
580
 
 
581
                if (!NODE_IS_ULTRA(n) || !NODE_IS_WRITABLE(n))
 
582
                        continue;
 
583
 
 
584
                /*
 
585
                 * Skip node if we haven't received the handshaking ping yet
 
586
                 * or if we already queried it.
 
587
                 */
 
588
 
 
589
                if (n->received == 0)
 
590
                        continue;
 
591
 
 
592
                if (g_hash_table_lookup(dq->queried, GUINT_TO_POINTER(n->id)))
 
593
                        continue;
 
594
 
 
595
                /*
 
596
                 * Skip node if we're in TX flow-control (query will likely not
 
597
                 * be transmitted before the next timeout, and it could even be
 
598
                 * dropped) or if we're remotely flow-controlled (no queries to
 
599
                 * be sent for now).
 
600
                 */
 
601
 
 
602
                if (NODE_IN_TX_FLOW_CONTROL(n) || n->hops_flow == 0)
 
603
                        continue;
 
604
 
 
605
                nup = &nv[i++];
 
606
 
 
607
                nup->node = n;
 
608
                nup->qhv = dq->qhv;
 
609
                nup->can_route = -1;            /* We don't know yet */
 
610
        }
 
611
 
 
612
        return i;
 
613
}
 
614
 
 
615
/**
 
616
 * Forward message to all the leaves but the one originating this query,
 
617
 * according to their QRP tables.
 
618
 *
 
619
 * @attention
 
620
 * NB: In order to avoid qrt_build_query_target() selecting neighbouring
 
621
 * ultra nodes that support last-hop QRP, we ensure the TTL is NOT 1.
 
622
 * This is why we somehow duplicate qrt_route_query() here.
 
623
 */
 
624
static void
 
625
dq_sendto_leaves(dquery_t *dq, gnutella_node_t *source)
 
626
{
 
627
        GSList *nodes;
 
628
        gchar *payload;
 
629
        struct gnutella_header *head;
 
630
 
 
631
        payload = pmsg_start(dq->mb);
 
632
        head = (struct gnutella_header *) payload;
 
633
 
 
634
        nodes = qrt_build_query_target(dq->qhv,
 
635
                head->hops, MAX(head->ttl, 2), source);
 
636
 
 
637
        if (dq_debug > 4)
 
638
                g_message("DQ QRP %s (%d word/hash) forwarded to %d/%d leaves",
 
639
                        gmsg_infostr_full(payload), dq->qhv->count, g_slist_length(nodes),
 
640
                        node_leaf_count);
 
641
 
 
642
        gmsg_mb_sendto_all(nodes, dq->mb);
 
643
 
 
644
        g_slist_free(nodes);
 
645
}
 
646
 
 
647
/**
 
648
 * Release the dynamic query object.
 
649
 */
 
650
static void
 
651
dq_free(dquery_t *dq)
 
652
{
 
653
        gint i;
 
654
        gboolean found;
 
655
        gpointer key;
 
656
        gpointer value;
 
657
        struct gnutella_header *head;
 
658
 
 
659
        g_assert(dq != NULL);
 
660
        g_assert(g_hash_table_lookup(dqueries, dq));
 
661
 
 
662
        if (dq_debug > 19)
 
663
                printf("DQ[%d] %s(%d secs; +%d secs) node #%d ending: "
 
664
                        "ttl=%d, queried=%d, horizon=%d, results=%d+%d\n",
 
665
                        dq->qid, dq->node_id == NODE_ID_LOCAL ? "local " : "",
 
666
                        (gint) (time(NULL) - dq->start),
 
667
                        (dq->flags & DQ_F_LINGER) ? (gint) (time(NULL) - dq->stop) : 0,
 
668
                        dq->node_id, dq->ttl, dq->up_sent, dq->horizon, dq->results,
 
669
                        dq->linger_results);
 
670
 
 
671
        if (dq->results_ev)
 
672
                cq_cancel(callout_queue, dq->results_ev);
 
673
 
 
674
        if (dq->expire_ev)
 
675
                cq_cancel(callout_queue, dq->expire_ev);
 
676
 
 
677
        if (dq->results >= dq->max_results)
 
678
                gnet_stats_count_general(GNR_DYN_QUERIES_COMPLETED_FULL, 1);
 
679
        else if (dq->results > 0)
 
680
                gnet_stats_count_general(GNR_DYN_QUERIES_COMPLETED_PARTIAL, 1);
 
681
        else
 
682
                gnet_stats_count_general(GNR_DYN_QUERIES_COMPLETED_ZERO, 1);
 
683
 
 
684
        if (dq->linger_results) {
 
685
                if (dq->results >= dq->max_results)
 
686
                        gnet_stats_count_general(GNR_DYN_QUERIES_LINGER_EXTRA, 1);
 
687
                else if (dq->results + dq->linger_results >= dq->max_results)
 
688
                        gnet_stats_count_general(GNR_DYN_QUERIES_LINGER_COMPLETED, 1);
 
689
                else
 
690
                        gnet_stats_count_general(GNR_DYN_QUERIES_LINGER_RESULTS, 1);
 
691
        }
 
692
 
 
693
        g_hash_table_destroy(dq->queried);
 
694
 
 
695
        qhvec_free(dq->qhv);
 
696
 
 
697
        for (i = 0; i < DQ_MAX_TTL; i++) {
 
698
                if (dq->by_ttl[i] != NULL)
 
699
                        pmsg_free(dq->by_ttl[i]);
 
700
        }
 
701
 
 
702
        if (!(dq->flags & DQ_F_EXITING))
 
703
                g_hash_table_remove(dqueries, dq);
 
704
 
 
705
        /*
 
706
         * Remove query from the `by_node_id' table but only if the node ID
 
707
         * is not the local node, since we don't store our own queries in
 
708
         * there: if we disappear, everything else will!
 
709
         *
 
710
         * Also, if the DQ_F_ID_CLEANING flag is set, then someone is already
 
711
         * cleaning up the `by_node_id' table for us, so we really must not
 
712
         * mess with the table ourselves.
 
713
         */
 
714
 
 
715
        if (
 
716
                dq->node_id != NODE_ID_LOCAL &&
 
717
                !(dq->flags & DQ_F_ID_CLEANING)
 
718
        ) {
 
719
                GSList *list;
 
720
 
 
721
                found = g_hash_table_lookup_extended(by_node_id,
 
722
                                        GUINT_TO_POINTER(dq->node_id), &key, &value);
 
723
 
 
724
                g_assert(found);
 
725
 
 
726
                list = value;
 
727
                list = g_slist_remove(list, dq);
 
728
 
 
729
                if (list == NULL) {
 
730
                        /* Last item removed, get rid of the entry */
 
731
                        g_hash_table_remove(by_node_id, key);
 
732
                } else if (list != value)
 
733
                        g_hash_table_insert(by_node_id, key, list);
 
734
        }
 
735
 
 
736
        /*
 
737
         * Remove query's MUID.
 
738
         */
 
739
 
 
740
        head = (struct gnutella_header *) pmsg_start(dq->mb);
 
741
        found = g_hash_table_lookup_extended(by_muid, head->muid, &key, &value);
 
742
 
 
743
        if (found) {                    /* Could be missing if a MUID conflict occurred */
 
744
                if (value == dq) {      /* Make sure it's for us in case of conflicts */
 
745
                        g_hash_table_remove(by_muid, key);
 
746
                        atom_guid_free(key);
 
747
                }
 
748
        }
 
749
 
 
750
        pmsg_free(dq->mb);                      /* Now that we used the MUID */
 
751
 
 
752
        wfree(dq, sizeof(*dq));
 
753
}
 
754
 
 
755
/**
 
756
 * Callout queue callback invoked when the dynamic query has expired.
 
757
 */
 
758
static void
 
759
dq_expired(cqueue_t *unused_cq, gpointer obj)
 
760
{
 
761
        dquery_t *dq = (dquery_t *) obj;
 
762
 
 
763
        (void) unused_cq;
 
764
 
 
765
        if (dq_debug > 19)
 
766
                printf("DQ[%d] expired\n", dq->qid);
 
767
 
 
768
        dq->expire_ev = NULL;   /* Indicates callback fired */
 
769
 
 
770
        /*
 
771
         * If query was lingering, free it.
 
772
         */
 
773
 
 
774
        if (dq->flags & DQ_F_LINGER) {
 
775
                dq_free(dq);
 
776
                return;
 
777
        }
 
778
 
 
779
        /*
 
780
         * Put query in lingering mode, to be able to monitor extra results
 
781
         * that come back after we stopped querying.
 
782
         */
 
783
 
 
784
        if (dq->results_ev) {
 
785
                cq_cancel(callout_queue, dq->results_ev);
 
786
                dq->results_ev = NULL;
 
787
        }
 
788
 
 
789
        dq_terminate(dq);
 
790
}
 
791
 
 
792
/**
 
793
 * Callout queue callback invoked when the result timer has expired.
 
794
 */
 
795
static void
 
796
dq_results_expired(cqueue_t *unused_cq, gpointer obj)
 
797
{
 
798
        dquery_t *dq = (dquery_t *) obj;
 
799
        gnutella_node_t *n;
 
800
        struct gnutella_header *head;
 
801
        gint timeout;
 
802
        guint32 avg;
 
803
        guint32 last;
 
804
 
 
805
        (void) unused_cq;
 
806
        g_assert(!(dq->flags & DQ_F_LINGER));
 
807
 
 
808
        dq->results_ev = NULL;  /* Indicates callback fired */
 
809
 
 
810
        /*
 
811
         * If we were waiting for a status reply from the queryier, well, we
 
812
         * just timed-out.  Cancel this query.
 
813
         */
 
814
 
 
815
        if (dq->flags & DQ_F_WAITING) {
 
816
                if (dq_debug > 19)
 
817
                        printf("DQ[%d] (%d secs) timeout waiting for status results\n",
 
818
                                dq->qid, (gint) (time(NULL) - dq->start));
 
819
                dq->flags &= ~DQ_F_WAITING;
 
820
                dq_terminate(dq);
 
821
                return;
 
822
        }
 
823
 
 
824
        /*
 
825
         * If host does not support leaf-guided queries, proceed to next ultra.
 
826
         * If we got unsolicited guidance info whilst we were waiting for
 
827
         * results to come back, also proceed.
 
828
         *
 
829
         * For local queries, DQ_F_LEAF_GUIDED is not set, so we'll continue
 
830
         * anyway.
 
831
         */
 
832
 
 
833
        if (!(dq->flags & DQ_F_LEAF_GUIDED) || (dq->flags & DQ_F_GOT_GUIDANCE)) {
 
834
                dq_send_next(dq);
 
835
                return;
 
836
        }
 
837
 
 
838
        g_assert(dq->node_id != NODE_ID_LOCAL);
 
839
        g_assert(dq->alive != NULL);
 
840
 
 
841
        /*
 
842
         * Ask queryier how many hits it kept so far.
 
843
         */
 
844
 
 
845
        n = node_active_by_id(dq->node_id);
 
846
 
 
847
        if (n == NULL) {
 
848
                if (dq_debug > 19)
 
849
                        printf("DQ[%d] (%d secs) node #%d appears to be dead\n",
 
850
                                dq->qid, (gint) (time(NULL) - dq->start), dq->node_id);
 
851
                dq_free(dq);
 
852
                return;
 
853
        }
 
854
 
 
855
        if (dq_debug > 19)
 
856
                printf("DQ[%d] (%d secs) requesting node #%d for status (kept=%u)\n",
 
857
                        dq->qid, (gint) (time(NULL) - dq->start), dq->node_id,
 
858
                        dq->kept_results);
 
859
 
 
860
        dq->flags |= DQ_F_WAITING;
 
861
        head = (struct gnutella_header *) pmsg_start(dq->mb);
 
862
 
 
863
        vmsg_send_qstat_req(n, head->muid);
 
864
 
 
865
        /*
 
866
         * Compute the timout using the available ping-pong round-trip
 
867
         * statistics.
 
868
         */
 
869
 
 
870
        alive_get_roundtrip_ms(dq->alive, &avg, &last);
 
871
        timeout = (avg + last) / 2000;          /* An average, converted to seconds */
 
872
        timeout = MAX(timeout, DQ_STATUS_TIMEOUT);
 
873
 
 
874
        if (dq_debug > 19)
 
875
                printf("DQ[%d] status reply timeout set to %d s\n", dq->qid, timeout);
 
876
 
 
877
        dq->results_ev = cq_insert(callout_queue, timeout,
 
878
                dq_results_expired, dq);
 
879
}
 
880
 
 
881
/**
 
882
 * Terminate active querying.
 
883
 */
 
884
static void
 
885
dq_terminate(dquery_t *dq)
 
886
{
 
887
        g_assert(!(dq->flags & DQ_F_LINGER));
 
888
        g_assert(dq->results_ev == NULL);
 
889
 
 
890
        /*
 
891
         * Put the query in lingering mode, so we can continue to monitor
 
892
         * results for some time after we stopped the dynamic querying.
 
893
         */
 
894
 
 
895
        if (dq->expire_ev != NULL)
 
896
                cq_resched(callout_queue, dq->expire_ev, DQ_LINGER_TIMEOUT);
 
897
        else
 
898
                dq->expire_ev = cq_insert(callout_queue, DQ_LINGER_TIMEOUT,
 
899
                        dq_expired, dq);
 
900
 
 
901
        dq->flags &= ~DQ_F_WAITING;
 
902
        dq->flags |= DQ_F_LINGER;
 
903
        dq->stop = time(NULL);
 
904
 
 
905
        if (dq_debug > 19)
 
906
                printf("DQ[%d] (%d secs) node #%d lingering: "
 
907
                        "ttl=%d, queried=%d, horizon=%d, results=%d\n",
 
908
                        dq->qid, (gint) (time(NULL) - dq->start), dq->node_id,
 
909
                        dq->ttl, dq->up_sent, dq->horizon, dq->results);
 
910
}
 
911
 
 
912
/**
 
913
 * qsort() callback for sorting nodes by increasing queue size.
 
914
 */
 
915
static gint
 
916
node_mq_cmp(const void *np1, const void *np2)
 
917
{
 
918
        gnutella_node_t *n1 = *(gnutella_node_t **) np1;
 
919
        gnutella_node_t *n2 = *(gnutella_node_t **) np2;
 
920
        gint qs1 = NODE_MQUEUE_PENDING(n1);
 
921
        gint qs2 = NODE_MQUEUE_PENDING(n2);
 
922
 
 
923
        /*
 
924
         * We don't cache the results of NODE_MQUEUE_PENDING() like we do in
 
925
         * node_mq_qrp_cmp() because this is done ONCE per each dynamic query,
 
926
         * (for the probe query only, and on an array containing only UP with
 
927
         * a matching QRP) whereas the other comparison routine is called for
 
928
         * each subsequent UP selection...
 
929
         */
 
930
 
 
931
        return CMP(qs1, qs2);
 
932
}
 
933
 
 
934
/**
 
935
 * qsort() callback for sorting nodes by increasing queue size, with a
 
936
 * preference towards nodes that have a QRP match.
 
937
 */
 
938
static gint
 
939
node_mq_qrp_cmp(const void *np1, const void *np2)
 
940
{
 
941
        struct next_up *nu1 = (struct next_up *) np1;
 
942
        struct next_up *nu2 = (struct next_up *) np2;
 
943
        gnutella_node_t *n1 = nu1->node;
 
944
        gnutella_node_t *n2 = nu2->node;
 
945
        gint qs1 = nu1->queue_pending;
 
946
        gint qs2 = nu2->queue_pending;
 
947
 
 
948
        /*
 
949
         * Cache the results of NODE_MQUEUE_PENDING() since it involves
 
950
         * several function calls to go down to the link layer buffers.
 
951
         */
 
952
 
 
953
        if (qs1 == -1)
 
954
                qs1 = nu1->queue_pending = NODE_MQUEUE_PENDING(n1);
 
955
        if (qs2 == -1)
 
956
                qs2 = nu2->queue_pending = NODE_MQUEUE_PENDING(n2);
 
957
 
 
958
        /*
 
959
         * If queue sizes are rather identical, compare based on whether
 
960
         * the node can route or not (i.e. whether it advertises a "match"
 
961
         * in its QRP table).
 
962
         *
 
963
         * Since this determination is a rather costly operation, cache it.
 
964
         */
 
965
 
 
966
        if (ABS(qs1 - qs2) < DQ_MQ_EPSILON) {
 
967
                if (nu1->can_route == -1)
 
968
                        nu1->can_route = qrp_node_can_route(n1, nu1->qhv);
 
969
                if (nu2->can_route == -1)
 
970
                        nu2->can_route = qrp_node_can_route(n2, nu2->qhv);
 
971
 
 
972
                if (!nu1->can_route == !nu2->can_route) {
 
973
                        /* Both can equally route or not route */
 
974
                        return CMP(qs1, qs2);
 
975
                }
 
976
 
 
977
                return nu1->can_route ? -1 : +1;
 
978
        }
 
979
 
 
980
        return qs1 < qs2 ? -1 : +1;
 
981
}
 
982
 
 
983
/**
 
984
 * Send individual query to selected node at the supplied TTL.
 
985
 * If the node advertises a lower maximum TTL, the supplied TTL is
 
986
 * adjusted down accordingly.
 
987
 */
 
988
static void
 
989
dq_send_query(dquery_t *dq, gnutella_node_t *n, gint ttl)
 
990
{
 
991
        struct pmsg_info *pmi;
 
992
        pmsg_t *mb;
 
993
 
 
994
        g_assert(!g_hash_table_lookup(dq->queried, GUINT_TO_POINTER(n->id)));
 
995
        g_assert(NODE_IS_WRITABLE(n));
 
996
 
 
997
        g_hash_table_insert(dq->queried,
 
998
                GUINT_TO_POINTER(n->id), GINT_TO_POINTER(1));
 
999
 
 
1000
        pmi = dq_pmi_alloc(dq, n->degree, MIN(n->max_ttl, ttl), n->id);
 
1001
 
 
1002
        /*
 
1003
         * Now for the magic...
 
1004
         *
 
1005
         * We're going to clone the messsage template into an extended one,
 
1006
         * which will be associated with a free routine.  That way, we'll know
 
1007
         * when the message is freed, and we'll get back the meta data (pmsg_info)
 
1008
         * as an argument to the free routine.
 
1009
         *
 
1010
         * Then, in the cloned message, adjust the TTL before sending.
 
1011
         */
 
1012
 
 
1013
        mb = dq_pmsg_by_ttl(dq, pmi->ttl);
 
1014
        mb = pmsg_clone_extend(mb, dq_pmsg_free, pmi);
 
1015
 
 
1016
        if (dq_debug > 19)
 
1017
                printf("DQ[%d] (%d secs) queuing ttl=%d to #%d %s <%s> Q=%d bytes\n",
 
1018
                        dq->qid, (gint) delta_time(time(NULL), dq->start),
 
1019
                        pmi->ttl, n->id, node_ip(n), node_vendor(n),
 
1020
                        (gint) NODE_MQUEUE_PENDING(n));
 
1021
 
 
1022
        dq->pending++;
 
1023
        gmsg_mb_sendto_one(n, mb);
 
1024
}
 
1025
 
 
1026
/**
 
1027
 * Iterate over the UPs which have not seen our query yet, select one and
 
1028
 * send it the query.
 
1029
 *
 
1030
 * If no more UP remain, terminate this query.
 
1031
 */
 
1032
static void
 
1033
dq_send_next(dquery_t *dq)
 
1034
{
 
1035
        struct next_up *nv;
 
1036
        gint ncount = max_connections;
 
1037
        gint found;
 
1038
        gnutella_node_t *node;
 
1039
        gint ttl;
 
1040
        gint timeout;
 
1041
        gint i;
 
1042
        gboolean sent = FALSE;
 
1043
        guint32 results;
 
1044
 
 
1045
        g_assert(dq->results_ev == NULL);
 
1046
 
 
1047
        /*
 
1048
         * Terminate query immediately if we're no longer an UP.
 
1049
         */
 
1050
 
 
1051
        if (current_peermode != NODE_P_ULTRA) {
 
1052
                if (dq_debug > 19)
 
1053
                        printf("DQ[%d] terminating (no longer an ultra node)\n", dq->qid);
 
1054
                goto terminate;
 
1055
        }
 
1056
 
 
1057
        dq->flags &= ~DQ_F_GOT_GUIDANCE;        /* Clear flag */
 
1058
 
 
1059
        /*
 
1060
         * Terminate query if we reached the amount of results we wanted or
 
1061
         * if we reached the maximum theoretical horizon.
 
1062
         */
 
1063
 
 
1064
        results = dq_kept_results(dq);
 
1065
 
 
1066
        if (dq->horizon >= DQ_MAX_HORIZON || results >= dq->max_results) {
 
1067
                if (dq_debug > 19)
 
1068
                        printf("DQ[%d] terminating (horizon=%u >= %d, results=%u >= %u)\n",
 
1069
                                dq->qid, dq->horizon, DQ_MAX_HORIZON, results, dq->max_results);
 
1070
                goto terminate;
 
1071
        }
 
1072
 
 
1073
        /*
 
1074
         * Even if the query is leaf-guided, they have to keep some amount
 
1075
         * of results, or we're wasting our energy collecting results for
 
1076
         * something that has too restrictives filters.
 
1077
         *
 
1078
         * If they don't do leaf-guidance, the above test will trigger first!
 
1079
         */
 
1080
 
 
1081
        if (dq->results + dq->oob_results > dq->fin_results) {
 
1082
                if (dq_debug > 19)
 
1083
                        printf("DQ[%d] terminating (seen=%u + OOB=%u >= %u -- kept=%u)\n",
 
1084
                                dq->qid, dq->results, dq->oob_results, dq->fin_results,
 
1085
                                results);
 
1086
                goto terminate;
 
1087
        }
 
1088
 
 
1089
        /*
 
1090
         * If we already queried as many UPs as the maximum we configured,
 
1091
         * stop the query.
 
1092
         */
 
1093
 
 
1094
        if (dq->up_sent >= max_connections - normal_connections) {
 
1095
                if (dq_debug > 19)
 
1096
                        printf("DQ[%d] terminating (queried UPs=%u >= %u)\n",
 
1097
                                dq->qid, dq->up_sent, max_connections - normal_connections);
 
1098
                goto terminate;
 
1099
        }
 
1100
 
 
1101
        /*
 
1102
         * If we have reached the maximum amount of pending queries (messages
 
1103
         * queued but not sent yet), then wait.  Otherwise, we might select
 
1104
         * another node, and be suddenly overwhelmed by replies if the pending
 
1105
         * queries are finally sent and the query was popular...
 
1106
         */
 
1107
 
 
1108
        if (dq->pending >= DQ_MAX_PENDING) {
 
1109
                if (dq_debug > 19)
 
1110
                        printf("DQ[%d] waiting for %u ms (pending=%u)\n",
 
1111
                                dq->qid, dq->result_timeout, dq->pending);
 
1112
                dq->results_ev = cq_insert(callout_queue,
 
1113
                        dq->result_timeout, dq_results_expired, dq);
 
1114
                return;
 
1115
        }
 
1116
 
 
1117
        nv = walloc(ncount * sizeof(struct next_up));
 
1118
        found = dq_fill_next_up(dq, nv, ncount);
 
1119
 
 
1120
        if (dq_debug > 19)
 
1121
                printf("DQ[%d] still %d UP%s to query (results %sso far: %u)\n",
 
1122
                        dq->qid, found, found == 1 ? "" : "s",
 
1123
                        (dq->flags & DQ_F_LEAF_GUIDED) ? "reported kept " : "", results);
 
1124
 
 
1125
        if (found == 0) {
 
1126
                dq_terminate(dq);       /* Terminate query: no more UP to send it to */
 
1127
                goto cleanup;
 
1128
        }
 
1129
 
 
1130
        /*
 
1131
         * Sort the array by increasing queue size, so that the nodes with
 
1132
         * the less pending data are listed first, with a preference to nodes
 
1133
         * with a QRP match.
 
1134
         */
 
1135
 
 
1136
        qsort(nv, found, sizeof(struct next_up), node_mq_qrp_cmp);
 
1137
 
 
1138
        /*
 
1139
         * Select the first node, and compute the proper TTL for the query.
 
1140
         *
 
1141
         * If the selected TTL is 1 and the node is QRP-capable and says
 
1142
         * it won't match, pick the next...
 
1143
         */
 
1144
 
 
1145
        for (i = 0; i < found; i++) {
 
1146
                node = nv[i].node;
 
1147
                ttl = dq_select_ttl(dq, node, found);
 
1148
 
 
1149
                if (
 
1150
                        ttl == 1 && NODE_UP_QRP(node) &&
 
1151
                        !qrp_node_can_route(node, dq->qhv)
 
1152
                ) {
 
1153
                        if (dq_debug > 19)
 
1154
                                printf("DQ[%d] TTL=1, skipping node #%d: can't route query!\n",
 
1155
                                        dq->qid, node->id);
 
1156
 
 
1157
                        continue;
 
1158
                }
 
1159
 
 
1160
                dq_send_query(dq, node, ttl);
 
1161
                sent = TRUE;
 
1162
                break;
 
1163
        }
 
1164
 
 
1165
        if (!sent) {
 
1166
                dq_terminate(dq);
 
1167
                goto cleanup;
 
1168
        }
 
1169
 
 
1170
        /*
 
1171
         * Adjust waiting period if we don't get enough results, indicating
 
1172
         * that the query might be for rare content.
 
1173
         */
 
1174
 
 
1175
        if (
 
1176
                dq->horizon > DQ_MIN_HORIZON &&
 
1177
                results < (DQ_LOW_RESULTS * dq->horizon / DQ_MIN_HORIZON)
 
1178
        ) {
 
1179
                dq->result_timeout -= DQ_TIMEOUT_ADJUST;
 
1180
                dq->result_timeout = MAX(DQ_MIN_TIMEOUT, dq->result_timeout);
 
1181
        }
 
1182
 
 
1183
        /*
 
1184
         * Install a watchdog for the query, to go on if we don't get
 
1185
         * all the results we want by then.
 
1186
         */
 
1187
 
 
1188
        timeout = dq->result_timeout;
 
1189
        if (dq->pending > 1)
 
1190
                timeout += (dq->pending - 1) * DQ_PENDING_TIMEOUT;
 
1191
 
 
1192
        if (dq_debug > 1)
 
1193
                printf("DQ[%d] (%d secs) timeout set to %d ms (pending=%d)\n",
 
1194
                        dq->qid, (gint) (time(NULL) - dq->start), timeout, dq->pending);
 
1195
 
 
1196
        dq->results_ev = cq_insert(callout_queue, timeout, dq_results_expired, dq);
 
1197
 
 
1198
cleanup:
 
1199
        wfree(nv, ncount * sizeof(struct next_up));
 
1200
        return;
 
1201
 
 
1202
terminate:
 
1203
        dq_terminate(dq);
 
1204
}
 
1205
 
 
1206
/**
 
1207
 * Send probe query (initial querying).
 
1208
 *
 
1209
 * This can generate up to DQ_PROBE_UP individual queries.
 
1210
 */
 
1211
static void
 
1212
dq_send_probe(dquery_t *dq)
 
1213
{
 
1214
        gnutella_node_t **nv;
 
1215
        gint ncount = max_connections;
 
1216
        gint found;
 
1217
        gint ttl = dq->ttl;
 
1218
        gint i;
 
1219
 
 
1220
        g_assert(dq->results_ev == NULL);
 
1221
 
 
1222
        nv = walloc(ncount * sizeof(gnutella_node_t *));
 
1223
        found = dq_fill_probe_up(dq, nv, ncount);
 
1224
 
 
1225
        if (dq_debug > 19)
 
1226
                printf("DQ[%d] found %d UP%s to probe\n",
 
1227
                        dq->qid, found, found == 1 ? "" : "s");
 
1228
 
 
1229
        /*
 
1230
         * If we don't find any suitable UP holding that content, then
 
1231
         * the query might be for something that is rare enough.  Start
 
1232
         * the sequential probing.
 
1233
         */
 
1234
 
 
1235
        if (found == 0) {
 
1236
                dq_send_next(dq);
 
1237
                goto cleanup;
 
1238
        }
 
1239
 
 
1240
        /*
 
1241
         * If we have 3 times the amount of UPs necessary for the probe,
 
1242
         * then content must be common, so reduce TTL by 1.  If we have 6 times
 
1243
         * the default amount, further reduce by 1.
 
1244
         */
 
1245
 
 
1246
        if (found > 6 * DQ_PROBE_UP)
 
1247
                ttl--;
 
1248
        if (found > 3 * DQ_PROBE_UP)
 
1249
                ttl--;
 
1250
 
 
1251
        ttl = MAX(ttl, 1);
 
1252
 
 
1253
        /*
 
1254
         * Sort the array by increasing queue size, so that the nodes with
 
1255
         * the less pending data are listed first.
 
1256
         */
 
1257
 
 
1258
        qsort(nv, found, sizeof(gnutella_node_t *), node_mq_cmp);
 
1259
 
 
1260
        /*
 
1261
         * Send the probe query to the first DQ_PROBE_UP nodes.
 
1262
         */
 
1263
 
 
1264
        for (i = 0; i < DQ_PROBE_UP && i < found; i++)
 
1265
                dq_send_query(dq, nv[i], ttl);
 
1266
 
 
1267
        /*
 
1268
         * Install a watchdog for the query, to go on if we don't get
 
1269
         * all the results we want by then.  We wait the specified amount
 
1270
         * of time per connection plus an extra DQ_PROBE_TIMEOUT because
 
1271
         * this is the first queries we send and their results will help us
 
1272
         * assse how popular the query is.
 
1273
         */
 
1274
 
 
1275
        dq->results_ev = cq_insert(callout_queue,
 
1276
                MIN(found, DQ_PROBE_UP) * (DQ_PROBE_TIMEOUT + dq->result_timeout),
 
1277
                dq_results_expired, dq);
 
1278
 
 
1279
cleanup:
 
1280
        wfree(nv, ncount * sizeof(gnutella_node_t *));
 
1281
}
 
1282
 
 
1283
/**
 
1284
 * Common initialization code for a dynamic query.
 
1285
 */
 
1286
static void
 
1287
dq_common_init(dquery_t *dq)
 
1288
{
 
1289
        struct gnutella_header *head;
 
1290
 
 
1291
        dq->qid = dyn_query_id++;
 
1292
        dq->queried = g_hash_table_new(NULL, NULL);
 
1293
        dq->result_timeout = DQ_QUERY_TIMEOUT;
 
1294
        dq->start = time(NULL);
 
1295
 
 
1296
        /*
 
1297
         * Make sure the dynamic query structure is cleaned up in at most
 
1298
         * DQ_MAX_LIFETIME ms, whatever happens.
 
1299
         */
 
1300
 
 
1301
        dq->expire_ev = cq_insert(callout_queue, DQ_MAX_LIFETIME,
 
1302
                dq_expired, dq);
 
1303
 
 
1304
        /*
 
1305
         * Record the query as being "alive".
 
1306
         */
 
1307
 
 
1308
        g_hash_table_insert(dqueries, dq, GINT_TO_POINTER(1));
 
1309
 
 
1310
        /*
 
1311
         * If query is not for the local node, insert it in `by_node_id'.
 
1312
         */
 
1313
 
 
1314
        if (dq->node_id != NODE_ID_LOCAL) {
 
1315
                gboolean found;
 
1316
                gpointer key;
 
1317
                gpointer value;
 
1318
                GSList *list;
 
1319
 
 
1320
                found = g_hash_table_lookup_extended(by_node_id,
 
1321
                                        GUINT_TO_POINTER(dq->node_id), &key, &value);
 
1322
 
 
1323
                if (found) {
 
1324
                        list = value;
 
1325
                        list = gm_slist_insert_after(list, list, dq);
 
1326
                        g_assert(list == value);                /* Head not changed */
 
1327
                } else {
 
1328
                        list = g_slist_prepend(NULL, dq);
 
1329
                        key = GUINT_TO_POINTER(dq->node_id);
 
1330
                        g_hash_table_insert(by_node_id, key, list);
 
1331
                }
 
1332
        }
 
1333
 
 
1334
        /*
 
1335
         * Record the MUID of this query, warning if a conflict occurs.
 
1336
         */
 
1337
 
 
1338
        head = (struct gnutella_header *) pmsg_start(dq->mb);
 
1339
 
 
1340
        if (g_hash_table_lookup(by_muid, head->muid))
 
1341
                g_warning("conflicting MUID \"%s\" for dynamic query, ignoring.",
 
1342
                        guid_hex_str(head->muid));
 
1343
        else {
 
1344
                gchar *muid = atom_guid_get(head->muid);
 
1345
                g_hash_table_insert(by_muid, muid, dq);
 
1346
        }
 
1347
 
 
1348
        if (dq_debug > 19)
 
1349
                printf("DQ[%d] created for node #%d: TTL=%d max_results=%d "
 
1350
                        "guidance=%s MUID=%s q=\"%s\"\n",
 
1351
                        dq->qid, dq->node_id, dq->ttl, dq->max_results,
 
1352
                        (dq->flags & DQ_F_LEAF_GUIDED) ? "yes" : "no",
 
1353
                        guid_hex_str(head->muid), QUERY_TEXT(pmsg_start(dq->mb)));
 
1354
}
 
1355
 
 
1356
/**
 
1357
 * Start new dynamic query out of a message we got from one of our leaves.
 
1358
 */
 
1359
void
 
1360
dq_launch_net(gnutella_node_t *n, query_hashvec_t *qhv)
 
1361
{
 
1362
        dquery_t *dq;
 
1363
        guint16 req_speed;
 
1364
        gboolean tagged_speed = FALSE;
 
1365
 
 
1366
        g_assert(NODE_IS_LEAF(n));
 
1367
 
 
1368
        dq = walloc0(sizeof(*dq));
 
1369
 
 
1370
        dq->node_id = n->id;
 
1371
        dq->mb = gmsg_split_to_pmsg(
 
1372
                (guchar *) &n->header, n->data,
 
1373
                n->size + sizeof(struct gnutella_header));
 
1374
        dq->qhv = qhvec_clone(qhv);
 
1375
        if (qhvec_has_source(qhv, QUERY_H_URN))
 
1376
                dq->max_results = DQ_LEAF_RESULTS / DQ_SHA1_DECIMATOR;
 
1377
        else
 
1378
                dq->max_results = DQ_LEAF_RESULTS;
 
1379
        dq->fin_results = dq->max_results * 100 / DQ_PERCENT_KEPT;
 
1380
        dq->ttl = MIN(n->header.ttl, DQ_MAX_TTL);
 
1381
        dq->alive = n->alive_pings;
 
1382
 
 
1383
        /*
 
1384
         * Determine whether this query will be leaf-guided.
 
1385
         *
 
1386
         * If the remote node performed the vendor message supported exchange,
 
1387
         * we have already determined whether its queries would be leaf-guided
 
1388
         * (i.e. advertised support for BEAR/11v1 is enough to assume the query
 
1389
         * is leaf-guided).
 
1390
         *
 
1391
         * But there is also a flag in the query that can be set to indicate
 
1392
         * leaf-guidance, and some vendors may choose to rely on that solely
 
1393
         * and never advertise the vendor messages related to this feature.
 
1394
         * So we also perform a test on the query flags in the speed field.
 
1395
         */
 
1396
 
 
1397
        if (NODE_GUIDES_QUERY(n))
 
1398
                dq->flags |= DQ_F_LEAF_GUIDED;
 
1399
 
 
1400
        READ_GUINT16_LE(n->data, req_speed);
 
1401
        tagged_speed = (req_speed & QUERY_SPEED_MARK) ? TRUE : FALSE;
 
1402
 
 
1403
        if (tagged_speed && (req_speed & QUERY_SPEED_LEAF_GUIDED))
 
1404
                dq->flags |= DQ_F_LEAF_GUIDED;
 
1405
 
 
1406
        if (dq_debug > 19)
 
1407
                printf("DQ node #%d %s <%s> (%s leaf-guidance) %squeries \"%s\"\n",
 
1408
                        n->id, node_ip(n), node_vendor(n),
 
1409
                        (dq->flags & DQ_F_LEAF_GUIDED) ? "with" : "no",
 
1410
                        tagged_speed && (req_speed & QUERY_SPEED_OOB_REPLY) ? "OOB-" : "",
 
1411
                        QUERY_TEXT(pmsg_start(dq->mb)));
 
1412
 
 
1413
        gnet_stats_count_general(GNR_LEAF_DYN_QUERIES, 1);
 
1414
 
 
1415
        dq_common_init(dq);
 
1416
        dq_sendto_leaves(dq, n);
 
1417
        dq_send_probe(dq);
 
1418
}
 
1419
 
 
1420
/**
 
1421
 * Start new dynamic query for a local search.
 
1422
 *
 
1423
 * We become the owner of the `mb' and `qhv' pointers.
 
1424
 */
 
1425
void
 
1426
dq_launch_local(gnet_search_t handle, pmsg_t *mb, query_hashvec_t *qhv)
 
1427
{
 
1428
        dquery_t *dq;
 
1429
 
 
1430
        /*
 
1431
         * Local queries are queued in the global SQ, for slow dispatching.
 
1432
         * If we're no longer an ultra node, ignore the request.
 
1433
         */
 
1434
 
 
1435
        if (current_peermode != NODE_P_ULTRA) {
 
1436
                if (dq_debug)
 
1437
                        g_warning("ignoring dynamic query \"%s\": no longer an ultra node",
 
1438
                                QUERY_TEXT(pmsg_start(mb)));
 
1439
 
 
1440
                pmsg_free(mb);
 
1441
                qhvec_free(qhv);
 
1442
                return;
 
1443
        }
 
1444
 
 
1445
        /*
 
1446
         * OK, create the local dynamic query.
 
1447
         */
 
1448
 
 
1449
        dq = walloc0(sizeof(*dq));
 
1450
 
 
1451
        dq->node_id = NODE_ID_LOCAL;
 
1452
        dq->mb = mb;
 
1453
        dq->qhv = qhv;
 
1454
        dq->sh = handle;
 
1455
        if (qhvec_has_source(qhv, QUERY_H_URN))
 
1456
                dq->max_results = DQ_LOCAL_RESULTS / DQ_SHA1_DECIMATOR;
 
1457
        else
 
1458
                dq->max_results = DQ_LOCAL_RESULTS;
 
1459
        dq->fin_results = dq->max_results * 100 / DQ_PERCENT_KEPT;
 
1460
        dq->ttl = MIN(my_ttl, DQ_MAX_TTL);
 
1461
        dq->alive = NULL;
 
1462
 
 
1463
        gnet_stats_count_general(GNR_LOCAL_DYN_QUERIES, 1);
 
1464
 
 
1465
        dq_common_init(dq);
 
1466
        dq_sendto_leaves(dq, NULL);
 
1467
        dq_send_probe(dq);
 
1468
}
 
1469
 
 
1470
/**
 
1471
 * Tells us a node ID has been removed.
 
1472
 * Get rid of all the queries registered for that node.
 
1473
 */
 
1474
void
 
1475
dq_node_removed(guint32 node_id)
 
1476
{
 
1477
        gboolean found;
 
1478
        gpointer key;
 
1479
        gpointer value;
 
1480
        GSList *sl;
 
1481
 
 
1482
        found = g_hash_table_lookup_extended(by_node_id,
 
1483
                                GUINT_TO_POINTER(node_id), &key, &value);
 
1484
 
 
1485
        if (!found)
 
1486
                return;         /* No dynamic query for this node */
 
1487
 
 
1488
        for (sl = value; sl; sl = g_slist_next(sl)) {
 
1489
                dquery_t *dq = (dquery_t *) sl->data;
 
1490
 
 
1491
                if (dq_debug > 19)
 
1492
                        printf("DQ[%d] terminated by node #%d removal\n",
 
1493
                                dq->qid, dq->node_id);
 
1494
 
 
1495
                /* Don't remove query from the table in dq_free() */
 
1496
                dq->flags |= DQ_F_ID_CLEANING;
 
1497
                dq_free(dq);
 
1498
        }
 
1499
 
 
1500
        g_hash_table_remove(by_node_id, key);
 
1501
        g_slist_free(value);
 
1502
}
 
1503
 
 
1504
/**
 
1505
 * Common code to count the results.
 
1506
 *
 
1507
 * @param muid is the dynamic query's MUID, i.e. the MUID used to send out
 
1508
 * the query on the network (important for OOB-proxied queries).
 
1509
 * @param count is the amount of results we received or got notified about
 
1510
 * @param oob if TRUE indicates that we just got notified about OOB results
 
1511
 * awaiting, but which have not been claimed yet.  If FALSE, the results
 
1512
 * have been validated and will be sent to the queryier.
 
1513
 *
 
1514
 * @return FALSE if the query was explicitly cancelled by the user
 
1515
 */
 
1516
static gboolean
 
1517
dq_count_results(gchar *muid, gint count, gboolean oob)
 
1518
{
 
1519
        dquery_t *dq;
 
1520
 
 
1521
        g_assert(count > 0);            /* Query hits with no result are bad! */
 
1522
 
 
1523
        dq = g_hash_table_lookup(by_muid, muid);
 
1524
 
 
1525
        if (dq == NULL)
 
1526
                return TRUE;
 
1527
 
 
1528
        if (dq->flags & DQ_F_LINGER)
 
1529
                dq->linger_results += count;
 
1530
        else if (oob)
 
1531
                dq->oob_results += count;       /* Not yet claimed */
 
1532
        else {
 
1533
                dq->results += count;
 
1534
                dq->new_results += count;
 
1535
        }
 
1536
 
 
1537
        if (dq_debug > 19) {
 
1538
                if (dq->node_id == NODE_ID_LOCAL)
 
1539
                        dq->kept_results = search_get_kept_results_by_handle(dq->sh);
 
1540
                if (dq->flags & DQ_F_LINGER)
 
1541
                        printf("DQ[%d] %s(%d secs; +%d secs) "
 
1542
                                "+%d %slinger_results=%d kept=%d\n",
 
1543
                                dq->qid, dq->node_id == NODE_ID_LOCAL ? "local " : "",
 
1544
                                (gint) (time(NULL) - dq->start),
 
1545
                                (gint) (time(NULL) - dq->stop),
 
1546
                                count, oob ? "OOB " : "",
 
1547
                                dq->linger_results, dq->kept_results);
 
1548
                else
 
1549
                        printf("DQ[%d] %s(%d secs) "
 
1550
                                "+%d %sresults=%d new=%d kept=%d oob=%d\n",
 
1551
                                dq->qid, dq->node_id == NODE_ID_LOCAL ? "local " : "",
 
1552
                                (gint) (time(NULL) - dq->start),
 
1553
                                count, oob ? "OOB " : "",
 
1554
                                dq->results, dq->new_results, dq->kept_results,
 
1555
                                dq->oob_results);
 
1556
        }
 
1557
 
 
1558
        return (dq->flags & DQ_F_USR_CANCELLED) ? FALSE : TRUE;
 
1559
}
 
1560
 
 
1561
/**
 
1562
 * Called every time we successfully parsed a query hit from the network.
 
1563
 * If we have a dynamic query registered for the MUID, increase the result
 
1564
 * count.
 
1565
 *
 
1566
 * @return FALSE if the query was explicitly cancelled by the user and
 
1567
 * results should be dropped, TRUE otherwise.  In other words, returns
 
1568
 * whether we should forward the results.
 
1569
 */
 
1570
gboolean
 
1571
dq_got_results(gchar *muid, guint count)
 
1572
{
 
1573
        return dq_count_results(muid, count, FALSE);
 
1574
}
 
1575
 
 
1576
/**
 
1577
 * Called every time we get notified about the presence of some OOB hits.
 
1578
 * The hits have not yet been claimed.
 
1579
 *
 
1580
 * @return FALSE if the query was explicitly cancelled by the user and
 
1581
 * results should not be claimed.
 
1582
 */
 
1583
gboolean
 
1584
dq_oob_results_ind(gchar *muid, gint count)
 
1585
{
 
1586
        return dq_count_results(muid, count, TRUE);
 
1587
}
 
1588
 
 
1589
/**
 
1590
 * Called when OOB results were received, after dq_got_results() was
 
1591
 * called to record them.  We need to undo the accounting made when
 
1592
 * dq_oob_results_ind() was called (to register unclaimed hits, which
 
1593
 * were finally claimed and parsed).
 
1594
 */
 
1595
void
 
1596
dq_oob_results_got(const gchar *muid, guint count)
 
1597
{
 
1598
        dquery_t *dq;
 
1599
 
 
1600
        /* Query hits with no result are bad! */
 
1601
        g_assert(count > 0 && count <= INT_MAX);
 
1602
 
 
1603
        dq = g_hash_table_lookup(by_muid, muid);
 
1604
 
 
1605
        if (dq == NULL)
 
1606
                return;
 
1607
 
 
1608
        /*
 
1609
         * Don't assert, as a remote node could lie and advertise n hits,
 
1610
         * yet deliver m with m > n.
 
1611
         */
 
1612
 
 
1613
        if (dq->oob_results > count)
 
1614
                dq->oob_results -= count;       /* Claimed them! */
 
1615
        else
 
1616
                dq->oob_results = 0;
 
1617
}
 
1618
 
 
1619
/**
 
1620
 * Called when we get a "Query Status Response" message where the querying
 
1621
 * node informs us about the amount of results he kept after filtering.
 
1622
 *
 
1623
 * @param muid is the search MUID.
 
1624
 *
 
1625
 * @param node_id is the ID of the node that sent us the status response.
 
1626
 * we check that it is the one for the query, to avoid a neighbour telling
 
1627
 * us about a search it did not issue!
 
1628
 *
 
1629
 * @param kept is the amount of results they kept.
 
1630
 * The special value 0xffff is a request to stop the query immediately.
 
1631
 */
 
1632
void
 
1633
dq_got_query_status(gchar *muid, guint32 node_id, guint16 kept)
 
1634
{
 
1635
        dquery_t *dq;
 
1636
 
 
1637
        dq = g_hash_table_lookup(by_muid, muid);
 
1638
 
 
1639
        if (dq == NULL)
 
1640
                return;
 
1641
 
 
1642
        if (dq->node_id != node_id)
 
1643
                return;
 
1644
 
 
1645
        dq->kept_results = kept;
 
1646
        dq->flags |= DQ_F_GOT_GUIDANCE;
 
1647
 
 
1648
        if (dq_debug > 19) {
 
1649
                if (dq->flags & DQ_F_LINGER)
 
1650
                        printf("DQ[%d] (%d secs; +%d secs) kept_results=%d\n",
 
1651
                                dq->qid, (gint) (time(NULL) - dq->start),
 
1652
                                (gint) (time(NULL) - dq->stop), dq->kept_results);
 
1653
                else
 
1654
                        printf("DQ[%d] (%d secs) %ssolicited, kept_results=%d\n",
 
1655
                                dq->qid, (gint) (time(NULL) - dq->start),
 
1656
                                (dq->flags & DQ_F_WAITING) ? "" : "un", dq->kept_results);
 
1657
        }
 
1658
 
 
1659
        /*
 
1660
         * If they want us to terminate querying, honour it.
 
1661
         * If the query is already in lingering mode, do nothing.
 
1662
         *
 
1663
         * Setting DQ_F_USR_CANCELLED will prevent any forwarding of
 
1664
         * query hits for this query.
 
1665
         */
 
1666
 
 
1667
        if (kept == 0xffff) {
 
1668
                if (dq_debug > 19)
 
1669
                        printf("DQ[%d] terminating at user's request\n", dq->qid);
 
1670
 
 
1671
                dq->flags |= DQ_F_USR_CANCELLED;
 
1672
 
 
1673
                if (!(dq->flags & DQ_F_LINGER)) {
 
1674
                        if (dq->results_ev)
 
1675
                                cq_cancel(callout_queue, dq->results_ev);
 
1676
                        dq->results_ev = NULL;
 
1677
                        dq_terminate(dq);
 
1678
                }
 
1679
                return;
 
1680
        }
 
1681
 
 
1682
        /*
 
1683
         * If we were waiting for status, we can resume the course of this query.
 
1684
         */
 
1685
 
 
1686
        if (dq->flags & DQ_F_WAITING) {
 
1687
                g_assert(dq->results_ev != NULL);       /* The "timeout" for status */
 
1688
 
 
1689
                cq_cancel(callout_queue, dq->results_ev);
 
1690
                dq->results_ev = NULL;
 
1691
                dq->flags &= ~DQ_F_WAITING;
 
1692
 
 
1693
                dq_send_next(dq);
 
1694
                return;
 
1695
        }
 
1696
}
 
1697
 
 
1698
struct cancel_context {
 
1699
        gnet_search_t handle;
 
1700
        GSList *cancelled;
 
1701
};
 
1702
 
 
1703
/**
 
1704
 * Cancel local query bearing the specified search handle.
 
1705
 * -- hash table iterator callback
 
1706
 */
 
1707
static void
 
1708
dq_cancel_local(gpointer key, gpointer unused_value, gpointer udata)
 
1709
{
 
1710
        struct cancel_context *ctx = (struct cancel_context *) udata;
 
1711
        dquery_t *dq = (dquery_t *) key;
 
1712
 
 
1713
        (void) unused_value;
 
1714
        if (dq->node_id != NODE_ID_LOCAL || dq->sh != ctx->handle)
 
1715
                return;
 
1716
 
 
1717
        /*
 
1718
         * Don't remove `dq' from the table over which we're iterating,
 
1719
         * just remember it in the context for later removal.
 
1720
         */
 
1721
 
 
1722
        dq->flags |= DQ_F_EXITING;              /* So nothing is removed from the table */
 
1723
        dq_free(dq);
 
1724
 
 
1725
        ctx->cancelled = g_slist_prepend(ctx->cancelled, dq);
 
1726
}
 
1727
 
 
1728
/**
 
1729
 * Invoked when a local search is closed.
 
1730
 */
 
1731
void
 
1732
dq_search_closed(gnet_search_t handle)
 
1733
{
 
1734
        struct cancel_context *ctx;
 
1735
        GSList *sl;
 
1736
 
 
1737
        ctx = walloc(sizeof(*ctx));
 
1738
        ctx->handle = handle;
 
1739
        ctx->cancelled = NULL;
 
1740
 
 
1741
        g_hash_table_foreach(dqueries, dq_cancel_local, ctx);
 
1742
 
 
1743
        for (sl = ctx->cancelled; sl; sl = g_slist_next(sl))
 
1744
                g_hash_table_remove(dqueries, sl->data);
 
1745
 
 
1746
        g_slist_free(ctx->cancelled);
 
1747
        wfree(ctx, sizeof(*ctx));
 
1748
}
 
1749
 
 
1750
/**
 
1751
 * Called for OOB-proxied queries when we get an "OOB Reply Indication"
 
1752
 * from remote hosts.  The aim is to determine whether the query still
 
1753
 * needs results, to decide whether we'll claim the advertised results
 
1754
 * or not.
 
1755
 *
 
1756
 * @param muid the message ID of the query
 
1757
 * @param wanted where the amount of results still expected is written
 
1758
 *
 
1759
 * @return TRUE if the query is still active, FALSE if it does not exist
 
1760
 * any more, in which case nothing is returned into `wanted'.
 
1761
 */
 
1762
gboolean
 
1763
dq_get_results_wanted(gchar *muid, guint32 *wanted)
 
1764
{
 
1765
        dquery_t *dq;
 
1766
 
 
1767
        dq = g_hash_table_lookup(by_muid, muid);
 
1768
 
 
1769
        if (dq == NULL)
 
1770
                return FALSE;
 
1771
 
 
1772
        if (dq->flags & DQ_F_USR_CANCELLED)
 
1773
                *wanted = 0;
 
1774
        else {
 
1775
                guint32 kept = dq_kept_results(dq);
 
1776
                *wanted = (kept > dq->max_results) ? 0 : dq->max_results - kept;
 
1777
        }
 
1778
 
 
1779
        return TRUE;
 
1780
}
 
1781
 
 
1782
/**
 
1783
 * Initialize dynamic querying.
 
1784
 */
 
1785
void
 
1786
dq_init(void)
 
1787
{
 
1788
        dqueries = g_hash_table_new(g_direct_hash, 0);
 
1789
        by_node_id = g_hash_table_new(NULL, NULL);
 
1790
        by_muid = g_hash_table_new(guid_hash, guid_eq);
 
1791
        fill_hosts();
 
1792
}
 
1793
 
 
1794
/**
 
1795
 * Hashtable iteration callback to free the dquery_t object held as the key.
 
1796
 */
 
1797
static void
 
1798
free_query(gpointer key, gpointer unused_value, gpointer unused_udata)
 
1799
{
 
1800
        dquery_t *dq = (dquery_t *) key;
 
1801
 
 
1802
        (void) unused_value;
 
1803
        (void) unused_udata;
 
1804
 
 
1805
        dq->flags |= DQ_F_EXITING;              /* So nothing is removed from the table */
 
1806
        dq_free(dq);
 
1807
}
 
1808
 
 
1809
/**
 
1810
 * Hashtable iteration callback to free the items remaining in the
 
1811
 * by_node_id table.  Normally, after having freed the dqueries table,
 
1812
 * there should not be anything remaining, hence warn!
 
1813
 */
 
1814
static void
 
1815
free_query_list(gpointer key, gpointer value, gpointer unused_udata)
 
1816
{
 
1817
        GSList *list = (GSList *) value;
 
1818
        gint count = g_slist_length(list);
 
1819
        GSList *sl;
 
1820
 
 
1821
        (void) unused_udata;
 
1822
        g_warning("remained %d un-freed dynamic quer%s for node #%u",
 
1823
                count, count == 1 ? "y" : "ies", GPOINTER_TO_UINT(key));
 
1824
 
 
1825
        for (sl = list; sl; sl = g_slist_next(sl)) {
 
1826
                dquery_t *dq = (dquery_t *) sl->data;
 
1827
 
 
1828
                /* Don't remove query from the table we're traversing in dq_free() */
 
1829
                dq->flags |= DQ_F_ID_CLEANING;
 
1830
                dq_free(dq);
 
1831
        }
 
1832
 
 
1833
        g_slist_free(list);
 
1834
}
 
1835
 
 
1836
/**
 
1837
 * Hashtable iteration callback to free the MUIDs in the `by_muid' table.
 
1838
 * Normally, after having freed the dqueries table, there should not be
 
1839
 * anything remaining, hence warn!
 
1840
 */
 
1841
static void
 
1842
free_muid(gpointer key, gpointer unused_value, gpointer unused_udata)
 
1843
{
 
1844
        (void) unused_value;
 
1845
        (void) unused_udata;
 
1846
        g_warning("remained un-freed MUID \"%s\" in dynamic queries",
 
1847
                guid_hex_str(key));
 
1848
 
 
1849
        atom_guid_free(key);
 
1850
}
 
1851
 
 
1852
/**
 
1853
 * Cleanup data structures used by dynamic querying.
 
1854
 */
 
1855
void
 
1856
dq_close(void)
 
1857
{
 
1858
        g_hash_table_foreach(dqueries, free_query, NULL);
 
1859
        g_hash_table_destroy(dqueries);
 
1860
 
 
1861
        g_hash_table_foreach(by_node_id, free_query_list, NULL);
 
1862
        g_hash_table_destroy(by_node_id);
 
1863
 
 
1864
        g_hash_table_foreach(by_muid, free_muid, NULL);
 
1865
        g_hash_table_destroy(by_muid);
 
1866
}
 
1867
 
 
1868
/* vi: set ts=4 sw=4 cindent: */