2
* $Id: dq.c,v 1.25 2005/06/29 14:24:22 daichik Exp $
4
* Copyright (c) 2004, Raphael Manfredi
6
*----------------------------------------------------------------------
7
* This file is part of gtk-gnutella.
9
* gtk-gnutella is free software; you can redistribute it and/or modify
10
* it under the terms of the GNU General Public License as published by
11
* the Free Software Foundation; either version 2 of the License, or
12
* (at your option) any later version.
14
* gtk-gnutella is distributed in the hope that it will be useful,
15
* but WITHOUT ANY WARRANTY; without even the implied warranty of
16
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17
* GNU General Public License for more details.
19
* You should have received a copy of the GNU General Public License
20
* along with gtk-gnutella; if not, write to the Free Software
22
* 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23
*----------------------------------------------------------------------
32
* @author Raphael Manfredi
38
RCSID("$Id: dq.c,v 1.25 2005/06/29 14:24:22 daichik Exp $");
40
#include <math.h> /* For pow() */
48
#include "gnet_stats.h"
54
#include "if/gnet_property_priv.h"
56
#include "lib/atoms.h"
58
#include "lib/endian.h"
59
#include "lib/glib-missing.h"
61
#include "lib/walloc.h"
63
#include "lib/override.h" /* Must be the last header included */
65
#define DQ_MAX_LIFETIME 300000 /**< 5 minutes, in ms */
66
#define DQ_PROBE_TIMEOUT 1500 /**< 1.5 s extra per connection */
67
#define DQ_PENDING_TIMEOUT 1200 /**< 1.2 s extra per pending message */
68
#define DQ_QUERY_TIMEOUT 3700 /**< 3.7 s */
69
#define DQ_TIMEOUT_ADJUST 100 /**< 100 ms at each connection */
70
#define DQ_MIN_TIMEOUT 1500 /**< 1.5 s at least between queries */
71
#define DQ_LINGER_TIMEOUT 120000 /**< 2 minutes, in ms */
72
#define DQ_STATUS_TIMEOUT 30000 /**< 30 s, in ms, to reply to query status */
73
#define DQ_MAX_PENDING 3 /**< Max pending queries we allow */
75
#define DQ_LEAF_RESULTS 50 /**< # of results targetted for leaves */
76
#define DQ_LOCAL_RESULTS 150 /**< # of results for local queries */
77
#define DQ_SHA1_DECIMATOR 25 /**< Divide expected by that much for SHA1 */
78
#define DQ_PROBE_UP 3 /**< Amount of UPs for initial probe */
79
#define DQ_MAX_HORIZON 500000 /**< Stop search after that many UP queried */
80
#define DQ_MIN_HORIZON 3000 /**< Min horizon before timeout adjustment */
81
#define DQ_LOW_RESULTS 10 /**< After DQ_MIN_HORIZON queried for adj. */
82
#define DQ_PERCENT_KEPT 5 /**< Assume 5% of results kept, worst case */
84
#define DQ_MAX_TTL 5 /**< Max TTL we can use */
85
#define DQ_AVG_ULTRA_NODES 2 /**< Average # of ultranodes a leaf queries */
87
#define DQ_MQ_EPSILON 2048 /**< Queues identical at +/- 2K */
88
#define DQ_FUZZY_FACTOR 0.80 /**< Corrector for theoretical horizon */
91
* Compute start of search string (which is NUL terminated) in query.
92
* The "+2" skips the "speed" field in the query.
94
#define QUERY_TEXT(m) ((m) + sizeof(struct gnutella_header) + 2)
99
typedef struct dquery {
100
guint32 qid; /**< Unique query ID, to detect ghosts */
101
guint32 node_id; /**< ID of the node that originated the query */
102
guint32 flags; /**< Operational flags */
103
gnet_search_t sh; /**< Search handle, if node ID = NODE_ID_LOCAL */
104
pmsg_t *mb; /**< The search messsage "template" */
105
query_hashvec_t *qhv; /**< Query hash vector for QRP filtering */
106
GHashTable *queried; /**< Contains node IDs that we queried so far */
107
guint8 ttl; /**< Initial query TTL */
108
guint32 horizon; /**< Theoretical horizon reached thus far */
109
guint32 up_sent; /**< # of UPs to which we really sent our query */
110
guint32 pending; /**< Pending query messages not ACK'ed yet by mq */
111
guint32 max_results; /**< Max results we're targetting for */
112
guint32 fin_results; /**< # of results terminating leaf-guided query */
113
guint32 oob_results; /**< Amount of unclaimed OOB results reported */
114
guint32 results; /**< Results we got so far for the query */
115
guint32 linger_results; /**< Results we got whilst lingering */
116
guint32 new_results; /**< New we got since last query status request */
117
guint32 kept_results; /**< Results they say they kept after filtering */
118
guint32 result_timeout; /**< The current timeout for getting results */
119
gpointer expire_ev; /**< Callout queue global expiration event */
120
gpointer results_ev; /**< Callout queue results expiration event */
121
gpointer alive; /**< Alive ping stats for computing timeouts */
122
time_t start; /**< Time at which it started */
123
time_t stop; /**< Time at which it was terminated */
124
pmsg_t *by_ttl[DQ_MAX_TTL]; /**< Copied mesages, one for each TTL */
127
#define DQ_F_ID_CLEANING 0x00000001 /**< Cleaning the `by_node_id' table */
128
#define DQ_F_LINGER 0x00000002 /**< Lingering to monitor extra results */
129
#define DQ_F_LEAF_GUIDED 0x00000004 /**< Leaf-guided query */
130
#define DQ_F_WAITING 0x00000008 /**< Waiting guidance reply from leaf */
131
#define DQ_F_GOT_GUIDANCE 0x00000010 /**< Got unsolicited leaf guidance */
132
#define DQ_F_USR_CANCELLED 0x00000020 /**< Explicitely cancelled by user */
133
#define DQ_F_EXITING 0x80000000 /**< Final cleanup at exit time */
136
* This table keeps track of all the dynamic query objects that we have
137
* created and which are alive.
139
static GHashTable *dqueries = NULL;
142
* This table keeps track of all the dynamic query objects created
143
* for a given node ID. The key is the node ID (converted to a pointer) and
144
* the value is a GSList containing all the queries for that node.
146
static GHashTable *by_node_id = NULL;
149
* This table keeps track of the association between a MUID and the
150
* dynamic query, so that when results come back, we may account them
151
* for the relevant query.
153
* The keys are MUIDs (GUID atoms), the values are the dquery_t object.
155
static GHashTable *by_muid = NULL;
158
* Information about query messages sent.
160
* We can't really add too many fields to the pmsg_t blocks we enqueue.
161
* However, what we do is we extend the pmsg_t to enrich them with a free
162
* routine, and we use that fact to be notified by the message queue when
163
* a message is freed. We can then probe into the flags to see whether
166
* But adding a free routine is about as much as we can do with a generic
167
* message system. To be able to keep track of more information about the
168
* queries we send, we associate each message with a structure containing
169
* meta-information about it.
172
dquery_t *dq; /**< The dynamic query that sent the query */
173
guint32 qid; /**< Query ID of the dynamic query */
174
guint32 node_id; /**< The ID of the node we sent it to */
175
guint16 degree; /**< The advertised degree of the destination node */
176
guint8 ttl; /**< The TTL used for that query */
180
* Structure produced by dq_fill_next_up, representing the nodes to which
181
* we could send the query, along with routing information to be able to favor
182
* UPs that report a QRP match early in the querying process.
185
gnutella_node_t *node; /**< Selected node */
186
query_hashvec_t *qhv; /**< Query hash vector for the query */
187
gint can_route; /**< -1 = unknown, otherwise TRUE / FALSE */
188
gint queue_pending; /**< -1 = unknown, otherwise cached queue size */
192
* This table stores the pre-compution:
194
* hosts(degree,ttl) = Sum[(degree-1)^i, 0 <= i <= ttl-1]
196
* For degree = 1 to 40 and ttl = 1 to 5.
199
#define MAX_DEGREE 50
202
static guint32 hosts[MAX_DEGREE][MAX_TTL]; /**< Pre-computed horizon */
204
static guint32 dyn_query_id = 0;
206
static void dq_send_next(dquery_t *dq);
207
static void dq_terminate(dquery_t *dq);
210
* Compute the hosts[] table so that:
212
* hosts[i][j] = Sum[i^k, 0 <= k <= j]
214
* following the formula:
216
* hosts(degree,ttl) = Sum[(degree-1)^i, 0 <= i <= ttl-1]
224
for (i = 0; i < MAX_DEGREE; i++) {
226
for (j = 1; j < MAX_TTL; j++) {
227
hosts[i][j] = hosts[i][j-1] + pow(i, j);
230
printf("horizon(degree=%d, ttl=%d) = %d\n",
231
i+1, j+1, hosts[i][j]);
237
* Computes theoretical horizon reached by a query sent to a host advertising
238
* a given degree if it is going to travel ttl hops.
240
* We adjust the horizon by DQ_FUZZY_FACTOR, assuming that at each hop there
241
* is deperdition due to flow-control, network cycles, etc...
244
dq_get_horizon(gint degree, gint ttl)
249
g_assert(degree > 0);
252
i = MIN(degree, MAX_DEGREE) - 1;
253
j = MIN(ttl, MAX_TTL) - 1;
255
return hosts[i][j] * pow(DQ_FUZZY_FACTOR, j);
259
* Compute amount of results "kept" for the query, if we have this
260
* information available.
263
dq_kept_results(dquery_t *dq)
266
* For local queries, see how many results we kept so far.
268
* Since there's no notification for local queries about the
269
* amount of results kept (no "Query Status Results" messages)
270
* update the amount now.
273
if (dq->node_id == NODE_ID_LOCAL)
274
return dq->kept_results = search_get_kept_results_by_handle(dq->sh);
277
* We artificially reduce the kept results by a factor of
278
* DQ_AVG_ULTRA_NODES since the leaf node will report the total
279
* number of hits it got and kept from the other ultrapeers it is
280
* querying, and we assume it filtered out about the same proportion
281
* of hits everywhere.
284
return (dq->flags & DQ_F_LEAF_GUIDED) ?
285
(dq->kept_results / DQ_AVG_ULTRA_NODES) : dq->results;
289
* Select the proper TTL for the next query we're going to send to the
290
* specified node, assuming hosts are equally split among the remaining
291
* connections we have yet to query.
294
dq_select_ttl(dquery_t *dq, gnutella_node_t *node, gint connections)
298
gdouble results_per_up;
299
gdouble hosts_to_reach;
300
gdouble hosts_to_reach_via_node;
303
g_assert(connections > 0);
305
results = dq_kept_results(dq);
306
needed = dq->max_results - results;
308
g_assert(needed > 0); /* Or query would have been stopped */
310
results_per_up = dq->results / MAX(dq->horizon, 1);
311
hosts_to_reach = (gdouble) needed / MAX(results_per_up, (gdouble) 0.000001);
312
hosts_to_reach_via_node = hosts_to_reach / (gdouble) connections;
315
* Now iteratively find the TTL needed to reach the desired number
316
* of hosts, rounded to the lowest TTL to be conservative.
319
for (ttl = MIN(node->max_ttl, dq->ttl); ttl > 0; ttl--) {
320
if (dq_get_horizon(node->degree, ttl) <= hosts_to_reach_via_node)
325
ttl = MIN(node->max_ttl, dq->ttl);
333
* Create a pmsg_info structure, giving meta-information about the message
334
* we're about to send.
336
* @param dq DOCUMENT THIS!
337
* @param degree the degree of the node to which the message is sent
338
* @param ttl the TTL at which the message is sent
339
* @param node_id the ID of the node to which we send the message
341
static struct pmsg_info *
342
dq_pmi_alloc(dquery_t *dq, guint16 degree, guint8 ttl, guint32 node_id)
344
struct pmsg_info *pmi;
346
pmi = walloc(sizeof(*pmi));
350
pmi->degree = degree;
352
pmi->node_id = node_id;
358
* Get rid of the pmsg_info structure.
361
dq_pmi_free(struct pmsg_info *pmi)
363
wfree(pmi, sizeof(*pmi));
367
* Check whether query bearing the specified ID is still alive and has
368
* not been cancelled yet.
371
dq_alive(dquery_t *dq, guint32 qid)
373
if (!g_hash_table_lookup(dqueries, dq))
376
return dq->qid == qid; /* In case it reused the same address */
380
* Free routine for an extended message block.
383
dq_pmsg_free(pmsg_t *mb, gpointer arg)
385
struct pmsg_info *pmi = (struct pmsg_info *) arg;
386
dquery_t *dq = pmi->dq;
388
g_assert(pmsg_is_extended(mb));
391
* It is possible that whilst the message was in the message queue,
392
* the dynamic query was cancelled. Therefore, we need to ensure that
393
* the recorded query is still alive. We use both the combination of
394
* a hash table and a unique ID in case the address of an old dquery_t
395
* object is reused later.
398
if (!dq_alive(dq, pmi->qid))
401
g_assert(dq->pending > 0);
405
if (!pmsg_was_sent(mb)) {
411
* The message was not sent: we need to remove the entry for the
412
* node in the "dq->queried" structure, since the message did not
413
* make it through the network.
416
found = g_hash_table_lookup_extended(dq->queried,
417
GUINT_TO_POINTER(pmi->node_id), &key, &value);
419
g_assert(found); /* Or something is seriously corrupted */
421
g_hash_table_remove(dq->queried, GUINT_TO_POINTER(pmi->node_id));
424
printf("DQ[%d] %snode #%d degree=%d dropped message TTL=%d\n",
425
dq->qid, dq->node_id == NODE_ID_LOCAL ? "(local) " : "",
426
pmi->node_id, pmi->degree, pmi->ttl);
429
* If we don't have any more pending message and we're waiting
430
* for results, chances are we're going to wait for nothing!
432
* We can't re-enter mq from here, so reschedule the event for
433
* immediate delivery (in 1 ms, since we can't say 0).
436
if (0 == dq->pending && dq->results_ev)
437
cq_resched(callout_queue, dq->results_ev, 1);
441
* The message was sent. Adjust the total horizon reached thus far.
442
* Record that this UP got the query.
445
dq->horizon += dq_get_horizon(pmi->degree, pmi->ttl);
449
printf("DQ[%d] %snode #%d degree=%d sent message TTL=%d\n",
450
dq->qid, dq->node_id == NODE_ID_LOCAL ? "(local) " : "",
451
pmi->node_id, pmi->degree, pmi->ttl);
452
printf("DQ[%d] %s(%d secs) queried %d UP%s, "
453
"horizon=%d, results=%d\n",
454
dq->qid, dq->node_id == NODE_ID_LOCAL ? "local " : "",
455
(gint) (time(NULL) - dq->start),
456
dq->up_sent, dq->up_sent == 1 ? "" :"s",
457
dq->horizon, dq->results);
466
* Fetch message for a given TTL.
467
* If no such message exists yet, create it from the "template" message.
470
dq_pmsg_by_ttl(dquery_t *dq, gint ttl)
477
g_assert(ttl > 0 && ttl <= DQ_MAX_TTL);
479
mb = dq->by_ttl[ttl - 1];
484
* Copy does not exist for this TTL.
486
* First, create the data buffer, and copy the data from the
487
* template to this new buffer. We assume the original message
488
* is made of one data buffer only (no data block chaining yet).
491
t = dq->mb; /* Our "template" */
494
memcpy(pdata_start(db), pmsg_start(t), len);
497
* Patch the TTL in the new data buffer.
500
((struct gnutella_header *) pdata_start(db))->ttl = ttl;
503
* Now create a message for this data buffer and save it for later perusal.
506
mb = pmsg_alloc(pmsg_prio(t), db, 0, len);
507
dq->by_ttl[ttl - 1] = mb;
508
gmsg_install_presend(mb);
514
* Fill node vector with UP hosts to which we could send our probe query.
516
* @param dq DOCUMENT THIS!
517
* @param nv the pre-allocated node vector
518
* @param ncount the size of the vector
520
* @return amount of nodes we found.
523
dq_fill_probe_up(dquery_t *dq, gnutella_node_t **nv, gint ncount)
528
for (sl = node_all_nodes(); i < ncount && sl; sl = g_slist_next(sl)) {
529
struct gnutella_node *n = (struct gnutella_node *) sl->data;
531
if (!NODE_IS_ULTRA(n))
535
* Skip node if we haven't received the handshaking ping yet.
538
if (n->received == 0)
542
* Skip node if we're in TX flow-control (query will likely not
543
* be transmitted before the next timeout, and it could even be
544
* dropped) or if we're remotely flow-controlled (no queries to
548
if (NODE_IN_TX_FLOW_CONTROL(n) || n->hops_flow == 0)
551
if (!qrp_node_can_route(n, dq->qhv))
554
g_assert(NODE_IS_WRITABLE(n)); /* Checked by qrp_node_can_route() */
556
nv[i++] = n; /* Node or one of its leaves could answer */
563
* Fill node vector with UP hosts to which we could send our next query.
565
* @param dq DOCUMENT THIS!
566
* @param nv the pre-allocated node vector
567
* @param ncount the size of the vector
569
* @return amount of nodes we found.
572
dq_fill_next_up(dquery_t *dq, struct next_up *nv, gint ncount)
577
for (sl = node_all_nodes(); i < ncount && sl; sl = g_slist_next(sl)) {
578
struct gnutella_node *n = (struct gnutella_node *) sl->data;
581
if (!NODE_IS_ULTRA(n) || !NODE_IS_WRITABLE(n))
585
* Skip node if we haven't received the handshaking ping yet
586
* or if we already queried it.
589
if (n->received == 0)
592
if (g_hash_table_lookup(dq->queried, GUINT_TO_POINTER(n->id)))
596
* Skip node if we're in TX flow-control (query will likely not
597
* be transmitted before the next timeout, and it could even be
598
* dropped) or if we're remotely flow-controlled (no queries to
602
if (NODE_IN_TX_FLOW_CONTROL(n) || n->hops_flow == 0)
609
nup->can_route = -1; /* We don't know yet */
616
* Forward message to all the leaves but the one originating this query,
617
* according to their QRP tables.
620
* NB: In order to avoid qrt_build_query_target() selecting neighbouring
621
* ultra nodes that support last-hop QRP, we ensure the TTL is NOT 1.
622
* This is why we somehow duplicate qrt_route_query() here.
625
dq_sendto_leaves(dquery_t *dq, gnutella_node_t *source)
629
struct gnutella_header *head;
631
payload = pmsg_start(dq->mb);
632
head = (struct gnutella_header *) payload;
634
nodes = qrt_build_query_target(dq->qhv,
635
head->hops, MAX(head->ttl, 2), source);
638
g_message("DQ QRP %s (%d word/hash) forwarded to %d/%d leaves",
639
gmsg_infostr_full(payload), dq->qhv->count, g_slist_length(nodes),
642
gmsg_mb_sendto_all(nodes, dq->mb);
648
* Release the dynamic query object.
651
dq_free(dquery_t *dq)
657
struct gnutella_header *head;
659
g_assert(dq != NULL);
660
g_assert(g_hash_table_lookup(dqueries, dq));
663
printf("DQ[%d] %s(%d secs; +%d secs) node #%d ending: "
664
"ttl=%d, queried=%d, horizon=%d, results=%d+%d\n",
665
dq->qid, dq->node_id == NODE_ID_LOCAL ? "local " : "",
666
(gint) (time(NULL) - dq->start),
667
(dq->flags & DQ_F_LINGER) ? (gint) (time(NULL) - dq->stop) : 0,
668
dq->node_id, dq->ttl, dq->up_sent, dq->horizon, dq->results,
672
cq_cancel(callout_queue, dq->results_ev);
675
cq_cancel(callout_queue, dq->expire_ev);
677
if (dq->results >= dq->max_results)
678
gnet_stats_count_general(GNR_DYN_QUERIES_COMPLETED_FULL, 1);
679
else if (dq->results > 0)
680
gnet_stats_count_general(GNR_DYN_QUERIES_COMPLETED_PARTIAL, 1);
682
gnet_stats_count_general(GNR_DYN_QUERIES_COMPLETED_ZERO, 1);
684
if (dq->linger_results) {
685
if (dq->results >= dq->max_results)
686
gnet_stats_count_general(GNR_DYN_QUERIES_LINGER_EXTRA, 1);
687
else if (dq->results + dq->linger_results >= dq->max_results)
688
gnet_stats_count_general(GNR_DYN_QUERIES_LINGER_COMPLETED, 1);
690
gnet_stats_count_general(GNR_DYN_QUERIES_LINGER_RESULTS, 1);
693
g_hash_table_destroy(dq->queried);
697
for (i = 0; i < DQ_MAX_TTL; i++) {
698
if (dq->by_ttl[i] != NULL)
699
pmsg_free(dq->by_ttl[i]);
702
if (!(dq->flags & DQ_F_EXITING))
703
g_hash_table_remove(dqueries, dq);
706
* Remove query from the `by_node_id' table but only if the node ID
707
* is not the local node, since we don't store our own queries in
708
* there: if we disappear, everything else will!
710
* Also, if the DQ_F_ID_CLEANING flag is set, then someone is already
711
* cleaning up the `by_node_id' table for us, so we really must not
712
* mess with the table ourselves.
716
dq->node_id != NODE_ID_LOCAL &&
717
!(dq->flags & DQ_F_ID_CLEANING)
721
found = g_hash_table_lookup_extended(by_node_id,
722
GUINT_TO_POINTER(dq->node_id), &key, &value);
727
list = g_slist_remove(list, dq);
730
/* Last item removed, get rid of the entry */
731
g_hash_table_remove(by_node_id, key);
732
} else if (list != value)
733
g_hash_table_insert(by_node_id, key, list);
737
* Remove query's MUID.
740
head = (struct gnutella_header *) pmsg_start(dq->mb);
741
found = g_hash_table_lookup_extended(by_muid, head->muid, &key, &value);
743
if (found) { /* Could be missing if a MUID conflict occurred */
744
if (value == dq) { /* Make sure it's for us in case of conflicts */
745
g_hash_table_remove(by_muid, key);
750
pmsg_free(dq->mb); /* Now that we used the MUID */
752
wfree(dq, sizeof(*dq));
756
* Callout queue callback invoked when the dynamic query has expired.
759
dq_expired(cqueue_t *unused_cq, gpointer obj)
761
dquery_t *dq = (dquery_t *) obj;
766
printf("DQ[%d] expired\n", dq->qid);
768
dq->expire_ev = NULL; /* Indicates callback fired */
771
* If query was lingering, free it.
774
if (dq->flags & DQ_F_LINGER) {
780
* Put query in lingering mode, to be able to monitor extra results
781
* that come back after we stopped querying.
784
if (dq->results_ev) {
785
cq_cancel(callout_queue, dq->results_ev);
786
dq->results_ev = NULL;
793
* Callout queue callback invoked when the result timer has expired.
796
dq_results_expired(cqueue_t *unused_cq, gpointer obj)
798
dquery_t *dq = (dquery_t *) obj;
800
struct gnutella_header *head;
806
g_assert(!(dq->flags & DQ_F_LINGER));
808
dq->results_ev = NULL; /* Indicates callback fired */
811
* If we were waiting for a status reply from the queryier, well, we
812
* just timed-out. Cancel this query.
815
if (dq->flags & DQ_F_WAITING) {
817
printf("DQ[%d] (%d secs) timeout waiting for status results\n",
818
dq->qid, (gint) (time(NULL) - dq->start));
819
dq->flags &= ~DQ_F_WAITING;
825
* If host does not support leaf-guided queries, proceed to next ultra.
826
* If we got unsolicited guidance info whilst we were waiting for
827
* results to come back, also proceed.
829
* For local queries, DQ_F_LEAF_GUIDED is not set, so we'll continue
833
if (!(dq->flags & DQ_F_LEAF_GUIDED) || (dq->flags & DQ_F_GOT_GUIDANCE)) {
838
g_assert(dq->node_id != NODE_ID_LOCAL);
839
g_assert(dq->alive != NULL);
842
* Ask queryier how many hits it kept so far.
845
n = node_active_by_id(dq->node_id);
849
printf("DQ[%d] (%d secs) node #%d appears to be dead\n",
850
dq->qid, (gint) (time(NULL) - dq->start), dq->node_id);
856
printf("DQ[%d] (%d secs) requesting node #%d for status (kept=%u)\n",
857
dq->qid, (gint) (time(NULL) - dq->start), dq->node_id,
860
dq->flags |= DQ_F_WAITING;
861
head = (struct gnutella_header *) pmsg_start(dq->mb);
863
vmsg_send_qstat_req(n, head->muid);
866
* Compute the timout using the available ping-pong round-trip
870
alive_get_roundtrip_ms(dq->alive, &avg, &last);
871
timeout = (avg + last) / 2000; /* An average, converted to seconds */
872
timeout = MAX(timeout, DQ_STATUS_TIMEOUT);
875
printf("DQ[%d] status reply timeout set to %d s\n", dq->qid, timeout);
877
dq->results_ev = cq_insert(callout_queue, timeout,
878
dq_results_expired, dq);
882
* Terminate active querying.
885
dq_terminate(dquery_t *dq)
887
g_assert(!(dq->flags & DQ_F_LINGER));
888
g_assert(dq->results_ev == NULL);
891
* Put the query in lingering mode, so we can continue to monitor
892
* results for some time after we stopped the dynamic querying.
895
if (dq->expire_ev != NULL)
896
cq_resched(callout_queue, dq->expire_ev, DQ_LINGER_TIMEOUT);
898
dq->expire_ev = cq_insert(callout_queue, DQ_LINGER_TIMEOUT,
901
dq->flags &= ~DQ_F_WAITING;
902
dq->flags |= DQ_F_LINGER;
903
dq->stop = time(NULL);
906
printf("DQ[%d] (%d secs) node #%d lingering: "
907
"ttl=%d, queried=%d, horizon=%d, results=%d\n",
908
dq->qid, (gint) (time(NULL) - dq->start), dq->node_id,
909
dq->ttl, dq->up_sent, dq->horizon, dq->results);
913
* qsort() callback for sorting nodes by increasing queue size.
916
node_mq_cmp(const void *np1, const void *np2)
918
gnutella_node_t *n1 = *(gnutella_node_t **) np1;
919
gnutella_node_t *n2 = *(gnutella_node_t **) np2;
920
gint qs1 = NODE_MQUEUE_PENDING(n1);
921
gint qs2 = NODE_MQUEUE_PENDING(n2);
924
* We don't cache the results of NODE_MQUEUE_PENDING() like we do in
925
* node_mq_qrp_cmp() because this is done ONCE per each dynamic query,
926
* (for the probe query only, and on an array containing only UP with
927
* a matching QRP) whereas the other comparison routine is called for
928
* each subsequent UP selection...
931
return CMP(qs1, qs2);
935
* qsort() callback for sorting nodes by increasing queue size, with a
936
* preference towards nodes that have a QRP match.
939
node_mq_qrp_cmp(const void *np1, const void *np2)
941
struct next_up *nu1 = (struct next_up *) np1;
942
struct next_up *nu2 = (struct next_up *) np2;
943
gnutella_node_t *n1 = nu1->node;
944
gnutella_node_t *n2 = nu2->node;
945
gint qs1 = nu1->queue_pending;
946
gint qs2 = nu2->queue_pending;
949
* Cache the results of NODE_MQUEUE_PENDING() since it involves
950
* several function calls to go down to the link layer buffers.
954
qs1 = nu1->queue_pending = NODE_MQUEUE_PENDING(n1);
956
qs2 = nu2->queue_pending = NODE_MQUEUE_PENDING(n2);
959
* If queue sizes are rather identical, compare based on whether
960
* the node can route or not (i.e. whether it advertises a "match"
963
* Since this determination is a rather costly operation, cache it.
966
if (ABS(qs1 - qs2) < DQ_MQ_EPSILON) {
967
if (nu1->can_route == -1)
968
nu1->can_route = qrp_node_can_route(n1, nu1->qhv);
969
if (nu2->can_route == -1)
970
nu2->can_route = qrp_node_can_route(n2, nu2->qhv);
972
if (!nu1->can_route == !nu2->can_route) {
973
/* Both can equally route or not route */
974
return CMP(qs1, qs2);
977
return nu1->can_route ? -1 : +1;
980
return qs1 < qs2 ? -1 : +1;
984
* Send individual query to selected node at the supplied TTL.
985
* If the node advertises a lower maximum TTL, the supplied TTL is
986
* adjusted down accordingly.
989
dq_send_query(dquery_t *dq, gnutella_node_t *n, gint ttl)
991
struct pmsg_info *pmi;
994
g_assert(!g_hash_table_lookup(dq->queried, GUINT_TO_POINTER(n->id)));
995
g_assert(NODE_IS_WRITABLE(n));
997
g_hash_table_insert(dq->queried,
998
GUINT_TO_POINTER(n->id), GINT_TO_POINTER(1));
1000
pmi = dq_pmi_alloc(dq, n->degree, MIN(n->max_ttl, ttl), n->id);
1003
* Now for the magic...
1005
* We're going to clone the messsage template into an extended one,
1006
* which will be associated with a free routine. That way, we'll know
1007
* when the message is freed, and we'll get back the meta data (pmsg_info)
1008
* as an argument to the free routine.
1010
* Then, in the cloned message, adjust the TTL before sending.
1013
mb = dq_pmsg_by_ttl(dq, pmi->ttl);
1014
mb = pmsg_clone_extend(mb, dq_pmsg_free, pmi);
1017
printf("DQ[%d] (%d secs) queuing ttl=%d to #%d %s <%s> Q=%d bytes\n",
1018
dq->qid, (gint) delta_time(time(NULL), dq->start),
1019
pmi->ttl, n->id, node_ip(n), node_vendor(n),
1020
(gint) NODE_MQUEUE_PENDING(n));
1023
gmsg_mb_sendto_one(n, mb);
1027
* Iterate over the UPs which have not seen our query yet, select one and
1028
* send it the query.
1030
* If no more UP remain, terminate this query.
1033
dq_send_next(dquery_t *dq)
1036
gint ncount = max_connections;
1038
gnutella_node_t *node;
1042
gboolean sent = FALSE;
1045
g_assert(dq->results_ev == NULL);
1048
* Terminate query immediately if we're no longer an UP.
1051
if (current_peermode != NODE_P_ULTRA) {
1053
printf("DQ[%d] terminating (no longer an ultra node)\n", dq->qid);
1057
dq->flags &= ~DQ_F_GOT_GUIDANCE; /* Clear flag */
1060
* Terminate query if we reached the amount of results we wanted or
1061
* if we reached the maximum theoretical horizon.
1064
results = dq_kept_results(dq);
1066
if (dq->horizon >= DQ_MAX_HORIZON || results >= dq->max_results) {
1068
printf("DQ[%d] terminating (horizon=%u >= %d, results=%u >= %u)\n",
1069
dq->qid, dq->horizon, DQ_MAX_HORIZON, results, dq->max_results);
1074
* Even if the query is leaf-guided, they have to keep some amount
1075
* of results, or we're wasting our energy collecting results for
1076
* something that has too restrictives filters.
1078
* If they don't do leaf-guidance, the above test will trigger first!
1081
if (dq->results + dq->oob_results > dq->fin_results) {
1083
printf("DQ[%d] terminating (seen=%u + OOB=%u >= %u -- kept=%u)\n",
1084
dq->qid, dq->results, dq->oob_results, dq->fin_results,
1090
* If we already queried as many UPs as the maximum we configured,
1094
if (dq->up_sent >= max_connections - normal_connections) {
1096
printf("DQ[%d] terminating (queried UPs=%u >= %u)\n",
1097
dq->qid, dq->up_sent, max_connections - normal_connections);
1102
* If we have reached the maximum amount of pending queries (messages
1103
* queued but not sent yet), then wait. Otherwise, we might select
1104
* another node, and be suddenly overwhelmed by replies if the pending
1105
* queries are finally sent and the query was popular...
1108
if (dq->pending >= DQ_MAX_PENDING) {
1110
printf("DQ[%d] waiting for %u ms (pending=%u)\n",
1111
dq->qid, dq->result_timeout, dq->pending);
1112
dq->results_ev = cq_insert(callout_queue,
1113
dq->result_timeout, dq_results_expired, dq);
1117
nv = walloc(ncount * sizeof(struct next_up));
1118
found = dq_fill_next_up(dq, nv, ncount);
1121
printf("DQ[%d] still %d UP%s to query (results %sso far: %u)\n",
1122
dq->qid, found, found == 1 ? "" : "s",
1123
(dq->flags & DQ_F_LEAF_GUIDED) ? "reported kept " : "", results);
1126
dq_terminate(dq); /* Terminate query: no more UP to send it to */
1131
* Sort the array by increasing queue size, so that the nodes with
1132
* the less pending data are listed first, with a preference to nodes
1136
qsort(nv, found, sizeof(struct next_up), node_mq_qrp_cmp);
1139
* Select the first node, and compute the proper TTL for the query.
1141
* If the selected TTL is 1 and the node is QRP-capable and says
1142
* it won't match, pick the next...
1145
for (i = 0; i < found; i++) {
1147
ttl = dq_select_ttl(dq, node, found);
1150
ttl == 1 && NODE_UP_QRP(node) &&
1151
!qrp_node_can_route(node, dq->qhv)
1154
printf("DQ[%d] TTL=1, skipping node #%d: can't route query!\n",
1160
dq_send_query(dq, node, ttl);
1171
* Adjust waiting period if we don't get enough results, indicating
1172
* that the query might be for rare content.
1176
dq->horizon > DQ_MIN_HORIZON &&
1177
results < (DQ_LOW_RESULTS * dq->horizon / DQ_MIN_HORIZON)
1179
dq->result_timeout -= DQ_TIMEOUT_ADJUST;
1180
dq->result_timeout = MAX(DQ_MIN_TIMEOUT, dq->result_timeout);
1184
* Install a watchdog for the query, to go on if we don't get
1185
* all the results we want by then.
1188
timeout = dq->result_timeout;
1189
if (dq->pending > 1)
1190
timeout += (dq->pending - 1) * DQ_PENDING_TIMEOUT;
1193
printf("DQ[%d] (%d secs) timeout set to %d ms (pending=%d)\n",
1194
dq->qid, (gint) (time(NULL) - dq->start), timeout, dq->pending);
1196
dq->results_ev = cq_insert(callout_queue, timeout, dq_results_expired, dq);
1199
wfree(nv, ncount * sizeof(struct next_up));
1207
* Send probe query (initial querying).
1209
* This can generate up to DQ_PROBE_UP individual queries.
1212
dq_send_probe(dquery_t *dq)
1214
gnutella_node_t **nv;
1215
gint ncount = max_connections;
1220
g_assert(dq->results_ev == NULL);
1222
nv = walloc(ncount * sizeof(gnutella_node_t *));
1223
found = dq_fill_probe_up(dq, nv, ncount);
1226
printf("DQ[%d] found %d UP%s to probe\n",
1227
dq->qid, found, found == 1 ? "" : "s");
1230
* If we don't find any suitable UP holding that content, then
1231
* the query might be for something that is rare enough. Start
1232
* the sequential probing.
1241
* If we have 3 times the amount of UPs necessary for the probe,
1242
* then content must be common, so reduce TTL by 1. If we have 6 times
1243
* the default amount, further reduce by 1.
1246
if (found > 6 * DQ_PROBE_UP)
1248
if (found > 3 * DQ_PROBE_UP)
1254
* Sort the array by increasing queue size, so that the nodes with
1255
* the less pending data are listed first.
1258
qsort(nv, found, sizeof(gnutella_node_t *), node_mq_cmp);
1261
* Send the probe query to the first DQ_PROBE_UP nodes.
1264
for (i = 0; i < DQ_PROBE_UP && i < found; i++)
1265
dq_send_query(dq, nv[i], ttl);
1268
* Install a watchdog for the query, to go on if we don't get
1269
* all the results we want by then. We wait the specified amount
1270
* of time per connection plus an extra DQ_PROBE_TIMEOUT because
1271
* this is the first queries we send and their results will help us
1272
* assse how popular the query is.
1275
dq->results_ev = cq_insert(callout_queue,
1276
MIN(found, DQ_PROBE_UP) * (DQ_PROBE_TIMEOUT + dq->result_timeout),
1277
dq_results_expired, dq);
1280
wfree(nv, ncount * sizeof(gnutella_node_t *));
1284
* Common initialization code for a dynamic query.
1287
dq_common_init(dquery_t *dq)
1289
struct gnutella_header *head;
1291
dq->qid = dyn_query_id++;
1292
dq->queried = g_hash_table_new(NULL, NULL);
1293
dq->result_timeout = DQ_QUERY_TIMEOUT;
1294
dq->start = time(NULL);
1297
* Make sure the dynamic query structure is cleaned up in at most
1298
* DQ_MAX_LIFETIME ms, whatever happens.
1301
dq->expire_ev = cq_insert(callout_queue, DQ_MAX_LIFETIME,
1305
* Record the query as being "alive".
1308
g_hash_table_insert(dqueries, dq, GINT_TO_POINTER(1));
1311
* If query is not for the local node, insert it in `by_node_id'.
1314
if (dq->node_id != NODE_ID_LOCAL) {
1320
found = g_hash_table_lookup_extended(by_node_id,
1321
GUINT_TO_POINTER(dq->node_id), &key, &value);
1325
list = gm_slist_insert_after(list, list, dq);
1326
g_assert(list == value); /* Head not changed */
1328
list = g_slist_prepend(NULL, dq);
1329
key = GUINT_TO_POINTER(dq->node_id);
1330
g_hash_table_insert(by_node_id, key, list);
1335
* Record the MUID of this query, warning if a conflict occurs.
1338
head = (struct gnutella_header *) pmsg_start(dq->mb);
1340
if (g_hash_table_lookup(by_muid, head->muid))
1341
g_warning("conflicting MUID \"%s\" for dynamic query, ignoring.",
1342
guid_hex_str(head->muid));
1344
gchar *muid = atom_guid_get(head->muid);
1345
g_hash_table_insert(by_muid, muid, dq);
1349
printf("DQ[%d] created for node #%d: TTL=%d max_results=%d "
1350
"guidance=%s MUID=%s q=\"%s\"\n",
1351
dq->qid, dq->node_id, dq->ttl, dq->max_results,
1352
(dq->flags & DQ_F_LEAF_GUIDED) ? "yes" : "no",
1353
guid_hex_str(head->muid), QUERY_TEXT(pmsg_start(dq->mb)));
1357
* Start new dynamic query out of a message we got from one of our leaves.
1360
dq_launch_net(gnutella_node_t *n, query_hashvec_t *qhv)
1364
gboolean tagged_speed = FALSE;
1366
g_assert(NODE_IS_LEAF(n));
1368
dq = walloc0(sizeof(*dq));
1370
dq->node_id = n->id;
1371
dq->mb = gmsg_split_to_pmsg(
1372
(guchar *) &n->header, n->data,
1373
n->size + sizeof(struct gnutella_header));
1374
dq->qhv = qhvec_clone(qhv);
1375
if (qhvec_has_source(qhv, QUERY_H_URN))
1376
dq->max_results = DQ_LEAF_RESULTS / DQ_SHA1_DECIMATOR;
1378
dq->max_results = DQ_LEAF_RESULTS;
1379
dq->fin_results = dq->max_results * 100 / DQ_PERCENT_KEPT;
1380
dq->ttl = MIN(n->header.ttl, DQ_MAX_TTL);
1381
dq->alive = n->alive_pings;
1384
* Determine whether this query will be leaf-guided.
1386
* If the remote node performed the vendor message supported exchange,
1387
* we have already determined whether its queries would be leaf-guided
1388
* (i.e. advertised support for BEAR/11v1 is enough to assume the query
1391
* But there is also a flag in the query that can be set to indicate
1392
* leaf-guidance, and some vendors may choose to rely on that solely
1393
* and never advertise the vendor messages related to this feature.
1394
* So we also perform a test on the query flags in the speed field.
1397
if (NODE_GUIDES_QUERY(n))
1398
dq->flags |= DQ_F_LEAF_GUIDED;
1400
READ_GUINT16_LE(n->data, req_speed);
1401
tagged_speed = (req_speed & QUERY_SPEED_MARK) ? TRUE : FALSE;
1403
if (tagged_speed && (req_speed & QUERY_SPEED_LEAF_GUIDED))
1404
dq->flags |= DQ_F_LEAF_GUIDED;
1407
printf("DQ node #%d %s <%s> (%s leaf-guidance) %squeries \"%s\"\n",
1408
n->id, node_ip(n), node_vendor(n),
1409
(dq->flags & DQ_F_LEAF_GUIDED) ? "with" : "no",
1410
tagged_speed && (req_speed & QUERY_SPEED_OOB_REPLY) ? "OOB-" : "",
1411
QUERY_TEXT(pmsg_start(dq->mb)));
1413
gnet_stats_count_general(GNR_LEAF_DYN_QUERIES, 1);
1416
dq_sendto_leaves(dq, n);
1421
* Start new dynamic query for a local search.
1423
* We become the owner of the `mb' and `qhv' pointers.
1426
dq_launch_local(gnet_search_t handle, pmsg_t *mb, query_hashvec_t *qhv)
1431
* Local queries are queued in the global SQ, for slow dispatching.
1432
* If we're no longer an ultra node, ignore the request.
1435
if (current_peermode != NODE_P_ULTRA) {
1437
g_warning("ignoring dynamic query \"%s\": no longer an ultra node",
1438
QUERY_TEXT(pmsg_start(mb)));
1446
* OK, create the local dynamic query.
1449
dq = walloc0(sizeof(*dq));
1451
dq->node_id = NODE_ID_LOCAL;
1455
if (qhvec_has_source(qhv, QUERY_H_URN))
1456
dq->max_results = DQ_LOCAL_RESULTS / DQ_SHA1_DECIMATOR;
1458
dq->max_results = DQ_LOCAL_RESULTS;
1459
dq->fin_results = dq->max_results * 100 / DQ_PERCENT_KEPT;
1460
dq->ttl = MIN(my_ttl, DQ_MAX_TTL);
1463
gnet_stats_count_general(GNR_LOCAL_DYN_QUERIES, 1);
1466
dq_sendto_leaves(dq, NULL);
1471
* Tells us a node ID has been removed.
1472
* Get rid of all the queries registered for that node.
1475
dq_node_removed(guint32 node_id)
1482
found = g_hash_table_lookup_extended(by_node_id,
1483
GUINT_TO_POINTER(node_id), &key, &value);
1486
return; /* No dynamic query for this node */
1488
for (sl = value; sl; sl = g_slist_next(sl)) {
1489
dquery_t *dq = (dquery_t *) sl->data;
1492
printf("DQ[%d] terminated by node #%d removal\n",
1493
dq->qid, dq->node_id);
1495
/* Don't remove query from the table in dq_free() */
1496
dq->flags |= DQ_F_ID_CLEANING;
1500
g_hash_table_remove(by_node_id, key);
1501
g_slist_free(value);
1505
* Common code to count the results.
1507
* @param muid is the dynamic query's MUID, i.e. the MUID used to send out
1508
* the query on the network (important for OOB-proxied queries).
1509
* @param count is the amount of results we received or got notified about
1510
* @param oob if TRUE indicates that we just got notified about OOB results
1511
* awaiting, but which have not been claimed yet. If FALSE, the results
1512
* have been validated and will be sent to the queryier.
1514
* @return FALSE if the query was explicitly cancelled by the user
1517
dq_count_results(gchar *muid, gint count, gboolean oob)
1521
g_assert(count > 0); /* Query hits with no result are bad! */
1523
dq = g_hash_table_lookup(by_muid, muid);
1528
if (dq->flags & DQ_F_LINGER)
1529
dq->linger_results += count;
1531
dq->oob_results += count; /* Not yet claimed */
1533
dq->results += count;
1534
dq->new_results += count;
1537
if (dq_debug > 19) {
1538
if (dq->node_id == NODE_ID_LOCAL)
1539
dq->kept_results = search_get_kept_results_by_handle(dq->sh);
1540
if (dq->flags & DQ_F_LINGER)
1541
printf("DQ[%d] %s(%d secs; +%d secs) "
1542
"+%d %slinger_results=%d kept=%d\n",
1543
dq->qid, dq->node_id == NODE_ID_LOCAL ? "local " : "",
1544
(gint) (time(NULL) - dq->start),
1545
(gint) (time(NULL) - dq->stop),
1546
count, oob ? "OOB " : "",
1547
dq->linger_results, dq->kept_results);
1549
printf("DQ[%d] %s(%d secs) "
1550
"+%d %sresults=%d new=%d kept=%d oob=%d\n",
1551
dq->qid, dq->node_id == NODE_ID_LOCAL ? "local " : "",
1552
(gint) (time(NULL) - dq->start),
1553
count, oob ? "OOB " : "",
1554
dq->results, dq->new_results, dq->kept_results,
1558
return (dq->flags & DQ_F_USR_CANCELLED) ? FALSE : TRUE;
1562
* Called every time we successfully parsed a query hit from the network.
1563
* If we have a dynamic query registered for the MUID, increase the result
1566
* @return FALSE if the query was explicitly cancelled by the user and
1567
* results should be dropped, TRUE otherwise. In other words, returns
1568
* whether we should forward the results.
1571
dq_got_results(gchar *muid, guint count)
1573
return dq_count_results(muid, count, FALSE);
1577
* Called every time we get notified about the presence of some OOB hits.
1578
* The hits have not yet been claimed.
1580
* @return FALSE if the query was explicitly cancelled by the user and
1581
* results should not be claimed.
1584
dq_oob_results_ind(gchar *muid, gint count)
1586
return dq_count_results(muid, count, TRUE);
1590
* Called when OOB results were received, after dq_got_results() was
1591
* called to record them. We need to undo the accounting made when
1592
* dq_oob_results_ind() was called (to register unclaimed hits, which
1593
* were finally claimed and parsed).
1596
dq_oob_results_got(const gchar *muid, guint count)
1600
/* Query hits with no result are bad! */
1601
g_assert(count > 0 && count <= INT_MAX);
1603
dq = g_hash_table_lookup(by_muid, muid);
1609
* Don't assert, as a remote node could lie and advertise n hits,
1610
* yet deliver m with m > n.
1613
if (dq->oob_results > count)
1614
dq->oob_results -= count; /* Claimed them! */
1616
dq->oob_results = 0;
1620
* Called when we get a "Query Status Response" message where the querying
1621
* node informs us about the amount of results he kept after filtering.
1623
* @param muid is the search MUID.
1625
* @param node_id is the ID of the node that sent us the status response.
1626
* we check that it is the one for the query, to avoid a neighbour telling
1627
* us about a search it did not issue!
1629
* @param kept is the amount of results they kept.
1630
* The special value 0xffff is a request to stop the query immediately.
1633
dq_got_query_status(gchar *muid, guint32 node_id, guint16 kept)
1637
dq = g_hash_table_lookup(by_muid, muid);
1642
if (dq->node_id != node_id)
1645
dq->kept_results = kept;
1646
dq->flags |= DQ_F_GOT_GUIDANCE;
1648
if (dq_debug > 19) {
1649
if (dq->flags & DQ_F_LINGER)
1650
printf("DQ[%d] (%d secs; +%d secs) kept_results=%d\n",
1651
dq->qid, (gint) (time(NULL) - dq->start),
1652
(gint) (time(NULL) - dq->stop), dq->kept_results);
1654
printf("DQ[%d] (%d secs) %ssolicited, kept_results=%d\n",
1655
dq->qid, (gint) (time(NULL) - dq->start),
1656
(dq->flags & DQ_F_WAITING) ? "" : "un", dq->kept_results);
1660
* If they want us to terminate querying, honour it.
1661
* If the query is already in lingering mode, do nothing.
1663
* Setting DQ_F_USR_CANCELLED will prevent any forwarding of
1664
* query hits for this query.
1667
if (kept == 0xffff) {
1669
printf("DQ[%d] terminating at user's request\n", dq->qid);
1671
dq->flags |= DQ_F_USR_CANCELLED;
1673
if (!(dq->flags & DQ_F_LINGER)) {
1675
cq_cancel(callout_queue, dq->results_ev);
1676
dq->results_ev = NULL;
1683
* If we were waiting for status, we can resume the course of this query.
1686
if (dq->flags & DQ_F_WAITING) {
1687
g_assert(dq->results_ev != NULL); /* The "timeout" for status */
1689
cq_cancel(callout_queue, dq->results_ev);
1690
dq->results_ev = NULL;
1691
dq->flags &= ~DQ_F_WAITING;
1698
struct cancel_context {
1699
gnet_search_t handle;
1704
* Cancel local query bearing the specified search handle.
1705
* -- hash table iterator callback
1708
dq_cancel_local(gpointer key, gpointer unused_value, gpointer udata)
1710
struct cancel_context *ctx = (struct cancel_context *) udata;
1711
dquery_t *dq = (dquery_t *) key;
1713
(void) unused_value;
1714
if (dq->node_id != NODE_ID_LOCAL || dq->sh != ctx->handle)
1718
* Don't remove `dq' from the table over which we're iterating,
1719
* just remember it in the context for later removal.
1722
dq->flags |= DQ_F_EXITING; /* So nothing is removed from the table */
1725
ctx->cancelled = g_slist_prepend(ctx->cancelled, dq);
1729
* Invoked when a local search is closed.
1732
dq_search_closed(gnet_search_t handle)
1734
struct cancel_context *ctx;
1737
ctx = walloc(sizeof(*ctx));
1738
ctx->handle = handle;
1739
ctx->cancelled = NULL;
1741
g_hash_table_foreach(dqueries, dq_cancel_local, ctx);
1743
for (sl = ctx->cancelled; sl; sl = g_slist_next(sl))
1744
g_hash_table_remove(dqueries, sl->data);
1746
g_slist_free(ctx->cancelled);
1747
wfree(ctx, sizeof(*ctx));
1751
* Called for OOB-proxied queries when we get an "OOB Reply Indication"
1752
* from remote hosts. The aim is to determine whether the query still
1753
* needs results, to decide whether we'll claim the advertised results
1756
* @param muid the message ID of the query
1757
* @param wanted where the amount of results still expected is written
1759
* @return TRUE if the query is still active, FALSE if it does not exist
1760
* any more, in which case nothing is returned into `wanted'.
1763
dq_get_results_wanted(gchar *muid, guint32 *wanted)
1767
dq = g_hash_table_lookup(by_muid, muid);
1772
if (dq->flags & DQ_F_USR_CANCELLED)
1775
guint32 kept = dq_kept_results(dq);
1776
*wanted = (kept > dq->max_results) ? 0 : dq->max_results - kept;
1783
* Initialize dynamic querying.
1788
dqueries = g_hash_table_new(g_direct_hash, 0);
1789
by_node_id = g_hash_table_new(NULL, NULL);
1790
by_muid = g_hash_table_new(guid_hash, guid_eq);
1795
* Hashtable iteration callback to free the dquery_t object held as the key.
1798
free_query(gpointer key, gpointer unused_value, gpointer unused_udata)
1800
dquery_t *dq = (dquery_t *) key;
1802
(void) unused_value;
1803
(void) unused_udata;
1805
dq->flags |= DQ_F_EXITING; /* So nothing is removed from the table */
1810
* Hashtable iteration callback to free the items remaining in the
1811
* by_node_id table. Normally, after having freed the dqueries table,
1812
* there should not be anything remaining, hence warn!
1815
free_query_list(gpointer key, gpointer value, gpointer unused_udata)
1817
GSList *list = (GSList *) value;
1818
gint count = g_slist_length(list);
1821
(void) unused_udata;
1822
g_warning("remained %d un-freed dynamic quer%s for node #%u",
1823
count, count == 1 ? "y" : "ies", GPOINTER_TO_UINT(key));
1825
for (sl = list; sl; sl = g_slist_next(sl)) {
1826
dquery_t *dq = (dquery_t *) sl->data;
1828
/* Don't remove query from the table we're traversing in dq_free() */
1829
dq->flags |= DQ_F_ID_CLEANING;
1837
* Hashtable iteration callback to free the MUIDs in the `by_muid' table.
1838
* Normally, after having freed the dqueries table, there should not be
1839
* anything remaining, hence warn!
1842
free_muid(gpointer key, gpointer unused_value, gpointer unused_udata)
1844
(void) unused_value;
1845
(void) unused_udata;
1846
g_warning("remained un-freed MUID \"%s\" in dynamic queries",
1849
atom_guid_free(key);
1853
* Cleanup data structures used by dynamic querying.
1858
g_hash_table_foreach(dqueries, free_query, NULL);
1859
g_hash_table_destroy(dqueries);
1861
g_hash_table_foreach(by_node_id, free_query_list, NULL);
1862
g_hash_table_destroy(by_node_id);
1864
g_hash_table_foreach(by_muid, free_muid, NULL);
1865
g_hash_table_destroy(by_muid);
1868
/* vi: set ts=4 sw=4 cindent: */