4
Copyright (C) Andrew Tridgell 2004
6
** NOTE! The following LGPL license applies to the ldb
7
** library. This does NOT imply that all of Samba is released
10
This library is free software; you can redistribute it and/or
11
modify it under the terms of the GNU Lesser General Public
12
License as published by the Free Software Foundation; either
13
version 3 of the License, or (at your option) any later version.
15
This library is distributed in the hope that it will be useful,
16
but WITHOUT ANY WARRANTY; without even the implied warranty of
17
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18
Lesser General Public License for more details.
20
You should have received a copy of the GNU Lesser General Public
21
License along with this library; if not, see <http://www.gnu.org/licenses/>.
27
* Component: ldb tdb backend - indexing
29
* Description: indexing routines for ldb tdb backend
31
* Author: Andrew Tridgell
35
#include "ldb/include/includes.h"
37
#include "ldb/ldb_tdb/ldb_tdb.h"
40
find an element in a list, using the given comparison function and
41
assuming that the list is already sorted using comp_fn
43
return -1 if not found, or the index of the first occurance of needle if found
45
static int ldb_list_find(const void *needle,
46
const void *base, size_t nmemb, size_t size,
47
comparison_fn_t comp_fn)
49
const char *base_p = (const char *)base;
50
size_t min_i, max_i, test_i;
59
while (min_i < max_i) {
62
test_i = (min_i + max_i) / 2;
63
/* the following cast looks strange, but is
64
correct. The key to understanding it is that base_p
65
is a pointer to an array of pointers, so we have to
66
dereference it after casting to void **. The strange
67
const in the middle gives us the right type of pointer
68
after the dereference (tridge) */
69
r = comp_fn(needle, *(void * const *)(base_p + (size * test_i)));
71
/* scan back for first element */
73
comp_fn(needle, *(void * const *)(base_p + (size * (test_i-1)))) == 0) {
89
if (comp_fn(needle, *(void * const *)(base_p + (size * min_i))) == 0) {
102
return the dn key to be used for an index
105
static struct ldb_dn *ldb_dn_key(struct ldb_context *ldb,
106
const char *attr, const struct ldb_val *value)
111
const struct ldb_attrib_handler *h;
114
attr_folded = ldb_attr_casefold(ldb, attr);
119
h = ldb_attrib_handler(ldb, attr);
120
if (h->canonicalise_fn(ldb, ldb, value, &v) != 0) {
121
/* canonicalisation can be refused. For example,
122
a attribute that takes wildcards will refuse to canonicalise
123
if the value contains a wildcard */
124
talloc_free(attr_folded);
127
if (ldb_should_b64_encode(&v)) {
128
char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
129
if (!vstr) return NULL;
130
dn = talloc_asprintf(ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
132
if (v.data != value->data) {
135
talloc_free(attr_folded);
136
if (dn == NULL) return NULL;
140
dn = talloc_asprintf(ldb, "%s:%s:%.*s",
141
LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
143
if (v.data != value->data) {
146
talloc_free(attr_folded);
149
ret = ldb_dn_explode(ldb, dn);
155
see if a attribute value is in the list of indexed attributes
157
static int ldb_msg_find_idx(const struct ldb_message *msg, const char *attr,
158
unsigned int *v_idx, const char *key)
161
for (i=0;i<msg->num_elements;i++) {
162
if (ldb_attr_cmp(msg->elements[i].name, key) == 0) {
163
const struct ldb_message_element *el =
165
for (j=0;j<el->num_values;j++) {
166
if (ldb_attr_cmp((char *)el->values[j].data, attr) == 0) {
178
/* used in sorting dn lists */
179
static int list_cmp(const char **s1, const char **s2)
181
return strcmp(*s1, *s2);
185
return a list of dn's that might match a simple indexed search or
187
static int ltdb_index_dn_simple(struct ldb_module *module,
188
const struct ldb_parse_tree *tree,
189
const struct ldb_message *index_list,
190
struct dn_list *list)
192
struct ldb_context *ldb = module->ldb;
196
struct ldb_message *msg;
201
/* if the attribute isn't in the list of indexed attributes then
202
this node needs a full search */
203
if (ldb_msg_find_idx(index_list, tree->u.equality.attr, NULL, LTDB_IDXATTR) == -1) {
207
/* the attribute is indexed. Pull the list of DNs that match the
209
dn = ldb_dn_key(ldb, tree->u.equality.attr, &tree->u.equality.value);
212
msg = talloc(list, struct ldb_message);
217
ret = ltdb_search_dn1(module, dn, msg);
219
if (ret == 0 || ret == -1) {
223
for (i=0;i<msg->num_elements;i++) {
224
struct ldb_message_element *el;
226
if (strcmp(msg->elements[i].name, LTDB_IDX) != 0) {
230
el = &msg->elements[i];
232
list->dn = talloc_array(list, char *, el->num_values);
238
for (j=0;j<el->num_values;j++) {
239
list->dn[list->count] =
240
talloc_strdup(list->dn, (char *)el->values[j].data);
241
if (!list->dn[list->count]) {
251
if (list->count > 1) {
252
qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t) list_cmp);
259
static int list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
262
return a list of dn's that might match a simple indexed search on
263
the special objectclass attribute
265
static int ltdb_index_dn_objectclass(struct ldb_module *module,
266
const struct ldb_parse_tree *tree,
267
const struct ldb_message *index_list,
268
struct dn_list *list)
270
struct ldb_context *ldb = module->ldb;
273
const char *target = (const char *)tree->u.equality.value.data;
274
const char **subclasses;
279
ret = ltdb_index_dn_simple(module, tree, index_list, list);
281
subclasses = ldb_subclass_list(module->ldb, target);
283
if (subclasses == NULL) {
287
for (i=0;subclasses[i];i++) {
288
struct ldb_parse_tree tree2;
289
struct dn_list *list2;
290
tree2.operation = LDB_OP_EQUALITY;
291
tree2.u.equality.attr = LTDB_OBJECTCLASS;
292
if (!tree2.u.equality.attr) {
295
tree2.u.equality.value.data =
296
(uint8_t *)talloc_strdup(list, subclasses[i]);
297
if (tree2.u.equality.value.data == NULL) {
300
tree2.u.equality.value.length = strlen(subclasses[i]);
301
list2 = talloc(list, struct dn_list);
303
talloc_free(tree2.u.equality.value.data);
306
if (ltdb_index_dn_objectclass(module, &tree2,
307
index_list, list2) == 1) {
308
if (list->count == 0) {
312
list_union(ldb, list, list2);
316
talloc_free(tree2.u.equality.value.data);
323
return a list of dn's that might match a leaf indexed search
325
static int ltdb_index_dn_leaf(struct ldb_module *module,
326
const struct ldb_parse_tree *tree,
327
const struct ldb_message *index_list,
328
struct dn_list *list)
330
if (ldb_attr_cmp(tree->u.equality.attr, LTDB_OBJECTCLASS) == 0) {
331
return ltdb_index_dn_objectclass(module, tree, index_list, list);
333
if (ldb_attr_dn(tree->u.equality.attr) == 0) {
334
list->dn = talloc_array(list, char *, 1);
335
if (list->dn == NULL) {
336
ldb_oom(module->ldb);
339
list->dn[0] = talloc_strdup(list->dn, (char *)tree->u.equality.value.data);
340
if (list->dn[0] == NULL) {
341
ldb_oom(module->ldb);
347
return ltdb_index_dn_simple(module, tree, index_list, list);
354
relies on the lists being sorted
356
static int list_intersect(struct ldb_context *ldb,
357
struct dn_list *list, const struct dn_list *list2)
359
struct dn_list *list3;
362
if (list->count == 0 || list2->count == 0) {
367
list3 = talloc(ldb, struct dn_list);
372
list3->dn = talloc_array(list3, char *, list->count);
379
for (i=0;i<list->count;i++) {
380
if (ldb_list_find(list->dn[i], list2->dn, list2->count,
381
sizeof(char *), (comparison_fn_t)strcmp) != -1) {
382
list3->dn[list3->count] = talloc_move(list3->dn, &list->dn[i]);
385
talloc_free(list->dn[i]);
389
talloc_free(list->dn);
390
list->dn = talloc_move(list, &list3->dn);
391
list->count = list3->count;
401
relies on the lists being sorted
403
static int list_union(struct ldb_context *ldb,
404
struct dn_list *list, const struct dn_list *list2)
408
unsigned int count = list->count;
410
if (list->count == 0 && list2->count == 0) {
415
d = talloc_realloc(list, list->dn, char *, list->count + list2->count);
421
for (i=0;i<list2->count;i++) {
422
if (ldb_list_find(list2->dn[i], list->dn, count,
423
sizeof(char *), (comparison_fn_t)strcmp) == -1) {
424
list->dn[list->count] = talloc_strdup(list->dn, list2->dn[i]);
425
if (!list->dn[list->count]) {
432
if (list->count != count) {
433
qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t)list_cmp);
439
static int ltdb_index_dn(struct ldb_module *module,
440
const struct ldb_parse_tree *tree,
441
const struct ldb_message *index_list,
442
struct dn_list *list);
448
static int ltdb_index_dn_or(struct ldb_module *module,
449
const struct ldb_parse_tree *tree,
450
const struct ldb_message *index_list,
451
struct dn_list *list)
453
struct ldb_context *ldb = module->ldb;
461
for (i=0;i<tree->u.list.num_elements;i++) {
462
struct dn_list *list2;
465
list2 = talloc(module, struct dn_list);
470
v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
483
talloc_free(list->dn);
490
list->dn = talloc_move(list, &list2->dn);
491
list->count = list2->count;
493
if (list_union(ldb, list, list2) == -1) {
502
if (list->count == 0) {
513
static int ltdb_index_dn_not(struct ldb_module *module,
514
const struct ldb_parse_tree *tree,
515
const struct ldb_message *index_list,
516
struct dn_list *list)
518
/* the only way to do an indexed not would be if we could
519
negate the not via another not or if we knew the total
520
number of database elements so we could know that the
521
existing expression covered the whole database.
523
instead, we just give up, and rely on a full index scan
524
(unless an outer & manages to reduce the list)
530
AND two index results
532
static int ltdb_index_dn_and(struct ldb_module *module,
533
const struct ldb_parse_tree *tree,
534
const struct ldb_message *index_list,
535
struct dn_list *list)
537
struct ldb_context *ldb = module->ldb;
545
for (i=0;i<tree->u.list.num_elements;i++) {
546
struct dn_list *list2;
549
list2 = talloc(module, struct dn_list);
554
v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
558
talloc_free(list->dn);
570
talloc_free(list->dn);
571
list->dn = talloc_move(list, &list2->dn);
572
list->count = list2->count;
574
if (list_intersect(ldb, list, list2) == -1) {
582
if (list->count == 0) {
583
talloc_free(list->dn);
592
return a list of dn's that might match a indexed search or
593
-1 if an error. return 0 for no matches, or 1 for matches
595
static int ltdb_index_dn(struct ldb_module *module,
596
const struct ldb_parse_tree *tree,
597
const struct ldb_message *index_list,
598
struct dn_list *list)
602
switch (tree->operation) {
604
ret = ltdb_index_dn_and(module, tree, index_list, list);
608
ret = ltdb_index_dn_or(module, tree, index_list, list);
612
ret = ltdb_index_dn_not(module, tree, index_list, list);
615
case LDB_OP_EQUALITY:
616
ret = ltdb_index_dn_leaf(module, tree, index_list, list);
619
case LDB_OP_SUBSTRING:
624
case LDB_OP_EXTENDED:
625
/* we can't index with fancy bitops yet */
634
filter a candidate dn_list from an indexed search into a set of results
635
extracting just the given attributes
637
static int ltdb_index_filter(const struct dn_list *dn_list,
638
struct ldb_handle *handle)
640
struct ltdb_context *ac = talloc_get_type(handle->private_data, struct ltdb_context);
641
struct ldb_reply *ares = NULL;
645
return LDB_ERR_OPERATIONS_ERROR;
648
for (i = 0; i < dn_list->count; i++) {
652
ares = talloc_zero(ac, struct ldb_reply);
654
handle->status = LDB_ERR_OPERATIONS_ERROR;
655
handle->state = LDB_ASYNC_DONE;
656
return LDB_ERR_OPERATIONS_ERROR;
659
ares->message = ldb_msg_new(ares);
660
if (!ares->message) {
661
handle->status = LDB_ERR_OPERATIONS_ERROR;
662
handle->state = LDB_ASYNC_DONE;
664
return LDB_ERR_OPERATIONS_ERROR;
668
dn = ldb_dn_explode(ares->message, dn_list->dn[i]);
671
return LDB_ERR_OPERATIONS_ERROR;
674
ret = ltdb_search_dn1(ac->module, dn, ares->message);
677
/* the record has disappeared? yes, this can happen */
683
/* an internal error */
685
return LDB_ERR_OPERATIONS_ERROR;
688
if (!ldb_match_msg(ac->module->ldb, ares->message, ac->tree, ac->base, ac->scope)) {
693
/* filter the attributes that the user wants */
694
ret = ltdb_filter_attrs(ares->message, ac->attrs);
697
handle->status = LDB_ERR_OPERATIONS_ERROR;
698
handle->state = LDB_ASYNC_DONE;
700
return LDB_ERR_OPERATIONS_ERROR;
703
ares->type = LDB_REPLY_ENTRY;
704
handle->state = LDB_ASYNC_PENDING;
705
handle->status = ac->callback(ac->module->ldb, ac->context, ares);
707
if (handle->status != LDB_SUCCESS) {
708
handle->state = LDB_ASYNC_DONE;
709
return handle->status;
717
search the database with a LDAP-like expression using indexes
718
returns -1 if an indexed search is not possible, in which
719
case the caller should call ltdb_search_full()
721
int ltdb_search_indexed(struct ldb_handle *handle)
723
struct ltdb_context *ac;
724
struct ltdb_private *ltdb;
725
struct dn_list *dn_list;
728
if (!(ac = talloc_get_type(handle->private_data,
729
struct ltdb_context)) ||
730
!(ltdb = talloc_get_type(ac->module->private_data,
731
struct ltdb_private))) {
735
if (ltdb->cache->indexlist->num_elements == 0 &&
736
ac->scope != LDB_SCOPE_BASE) {
737
/* no index list? must do full search */
741
dn_list = talloc(handle, struct dn_list);
742
if (dn_list == NULL) {
746
if (ac->scope == LDB_SCOPE_BASE) {
747
/* with BASE searches only one DN can match */
748
dn_list->dn = talloc_array(dn_list, char *, 1);
749
if (dn_list->dn == NULL) {
750
ldb_oom(ac->module->ldb);
753
dn_list->dn[0] = ldb_dn_linearize(dn_list, ac->base);
754
if (dn_list->dn[0] == NULL) {
755
ldb_oom(ac->module->ldb);
761
ret = ltdb_index_dn(ac->module, ac->tree, ltdb->cache->indexlist, dn_list);
765
/* we've got a candidate list - now filter by the full tree
766
and extract the needed attributes */
767
ret = ltdb_index_filter(dn_list, handle);
768
handle->status = ret;
769
handle->state = LDB_ASYNC_DONE;
772
talloc_free(dn_list);
778
add a index element where this is the first indexed DN for this value
780
static int ltdb_index_add1_new(struct ldb_context *ldb,
781
struct ldb_message *msg,
782
struct ldb_message_element *el,
785
struct ldb_message_element *el2;
787
/* add another entry */
788
el2 = talloc_realloc(msg, msg->elements,
789
struct ldb_message_element, msg->num_elements+1);
795
msg->elements[msg->num_elements].name = talloc_strdup(msg->elements, LTDB_IDX);
796
if (!msg->elements[msg->num_elements].name) {
799
msg->elements[msg->num_elements].num_values = 0;
800
msg->elements[msg->num_elements].values = talloc(msg->elements, struct ldb_val);
801
if (!msg->elements[msg->num_elements].values) {
804
msg->elements[msg->num_elements].values[0].length = strlen(dn);
805
msg->elements[msg->num_elements].values[0].data = discard_const_p(uint8_t, dn);
806
msg->elements[msg->num_elements].num_values = 1;
814
add a index element where this is not the first indexed DN for this
817
static int ltdb_index_add1_add(struct ldb_context *ldb,
818
struct ldb_message *msg,
819
struct ldb_message_element *el,
826
/* for multi-valued attributes we can end up with repeats */
827
for (i=0;i<msg->elements[idx].num_values;i++) {
828
if (strcmp(dn, (char *)msg->elements[idx].values[i].data) == 0) {
833
v2 = talloc_realloc(msg->elements, msg->elements[idx].values,
835
msg->elements[idx].num_values+1);
839
msg->elements[idx].values = v2;
841
msg->elements[idx].values[msg->elements[idx].num_values].length = strlen(dn);
842
msg->elements[idx].values[msg->elements[idx].num_values].data = discard_const_p(uint8_t, dn);
843
msg->elements[idx].num_values++;
849
add an index entry for one message element
851
static int ltdb_index_add1(struct ldb_module *module, const char *dn,
852
struct ldb_message_element *el, int v_idx)
854
struct ldb_context *ldb = module->ldb;
855
struct ldb_message *msg;
856
struct ldb_dn *dn_key;
860
msg = talloc(module, struct ldb_message);
866
dn_key = ldb_dn_key(ldb, el->name, &el->values[v_idx]);
872
talloc_steal(msg, dn_key);
874
ret = ltdb_search_dn1(module, dn_key, msg);
882
msg->num_elements = 0;
883
msg->elements = NULL;
886
for (i=0;i<msg->num_elements;i++) {
887
if (strcmp(LTDB_IDX, msg->elements[i].name) == 0) {
892
if (i == msg->num_elements) {
893
ret = ltdb_index_add1_new(ldb, msg, el, dn);
895
ret = ltdb_index_add1_add(ldb, msg, el, i, dn);
899
ret = ltdb_store(module, msg, TDB_REPLACE);
907
static int ltdb_index_add0(struct ldb_module *module, const char *dn,
908
struct ldb_message_element *elements, int num_el)
910
struct ltdb_private *ltdb =
911
(struct ltdb_private *)module->private_data;
919
if (ltdb->cache->indexlist->num_elements == 0) {
920
/* no indexed fields */
924
for (i = 0; i < num_el; i++) {
925
ret = ldb_msg_find_idx(ltdb->cache->indexlist, elements[i].name,
930
for (j = 0; j < elements[i].num_values; j++) {
931
ret = ltdb_index_add1(module, dn, &elements[i], j);
942
add the index entries for a new record
945
int ltdb_index_add(struct ldb_module *module, const struct ldb_message *msg)
947
struct ltdb_private *ltdb =
948
(struct ltdb_private *)module->private_data;
952
dn = ldb_dn_linearize(ltdb, msg->dn);
957
ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
966
delete an index entry for one message element
968
int ltdb_index_del_value(struct ldb_module *module, const char *dn,
969
struct ldb_message_element *el, int v_idx)
971
struct ldb_context *ldb = module->ldb;
972
struct ldb_message *msg;
973
struct ldb_dn *dn_key;
981
dn_key = ldb_dn_key(ldb, el->name, &el->values[v_idx]);
986
msg = talloc(dn_key, struct ldb_message);
992
ret = ltdb_search_dn1(module, dn_key, msg);
999
/* it wasn't indexed. Did we have an earlier error? If we did then
1001
talloc_free(dn_key);
1005
i = ldb_msg_find_idx(msg, dn, &j, LTDB_IDX);
1007
ldb_debug(ldb, LDB_DEBUG_ERROR,
1008
"ERROR: dn %s not found in %s\n", dn,
1009
ldb_dn_linearize(dn_key, dn_key));
1010
/* it ain't there. hmmm */
1011
talloc_free(dn_key);
1015
if (j != msg->elements[i].num_values - 1) {
1016
memmove(&msg->elements[i].values[j],
1017
&msg->elements[i].values[j+1],
1018
(msg->elements[i].num_values-(j+1)) *
1019
sizeof(msg->elements[i].values[0]));
1021
msg->elements[i].num_values--;
1023
if (msg->elements[i].num_values == 0) {
1024
ret = ltdb_delete_noindex(module, dn_key);
1026
ret = ltdb_store(module, msg, TDB_REPLACE);
1029
talloc_free(dn_key);
1035
delete the index entries for a record
1036
return -1 on failure
1038
int ltdb_index_del(struct ldb_module *module, const struct ldb_message *msg)
1040
struct ltdb_private *ltdb =
1041
(struct ltdb_private *)module->private_data;
1046
/* find the list of indexed fields */
1047
if (ltdb->cache->indexlist->num_elements == 0) {
1048
/* no indexed fields */
1052
if (ldb_dn_is_special(msg->dn)) {
1056
dn = ldb_dn_linearize(ltdb, msg->dn);
1061
for (i = 0; i < msg->num_elements; i++) {
1062
ret = ldb_msg_find_idx(ltdb->cache->indexlist, msg->elements[i].name,
1063
NULL, LTDB_IDXATTR);
1067
for (j = 0; j < msg->elements[i].num_values; j++) {
1068
ret = ltdb_index_del_value(module, dn, &msg->elements[i], j);
1082
traversal function that deletes all @INDEX records
1084
static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1086
const char *dn = "DN=" LTDB_INDEX ":";
1087
if (strncmp((char *)key.dptr, dn, strlen(dn)) == 0) {
1088
return tdb_delete(tdb, key);
1094
traversal function that adds @INDEX records during a re index
1096
static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1098
struct ldb_module *module = (struct ldb_module *)state;
1099
struct ldb_message *msg;
1104
if (strncmp((char *)key.dptr, "DN=@", 4) == 0 ||
1105
strncmp((char *)key.dptr, "DN=", 3) != 0) {
1109
msg = talloc(module, struct ldb_message);
1114
ret = ltdb_unpack_data(module, &data, msg);
1120
/* check if the DN key has changed, perhaps due to the
1121
case insensitivity of an element changing */
1122
key2 = ltdb_key(module, msg->dn);
1123
if (key2.dptr == NULL) {
1124
/* probably a corrupt record ... darn */
1125
ldb_debug(module->ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s\n",
1126
ldb_dn_linearize(msg, msg->dn));
1130
if (strcmp((char *)key2.dptr, (char *)key.dptr) != 0) {
1131
tdb_delete(tdb, key);
1132
tdb_store(tdb, key2, data, 0);
1134
talloc_free(key2.dptr);
1136
if (msg->dn == NULL) {
1137
dn = (char *)key.dptr + 3;
1139
if (!(dn = ldb_dn_linearize(msg->dn, msg->dn))) {
1145
ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
1153
force a complete reindex of the database
1155
int ltdb_reindex(struct ldb_module *module)
1157
struct ltdb_private *ltdb =
1158
(struct ltdb_private *)module->private_data;
1161
if (ltdb_cache_reload(module) != 0) {
1165
/* first traverse the database deleting any @INDEX records */
1166
ret = tdb_traverse(ltdb->tdb, delete_index, NULL);
1171
/* now traverse adding any indexes for normal LDB records */
1172
ret = tdb_traverse(ltdb->tdb, re_index, module);