~zulcss/samba/server-dailies-3.4

« back to all changes in this revision

Viewing changes to source4/dsdb/samdb/ldb_modules/tests/samba3sam.py

  • Committer: Chuck Short
  • Date: 2010-09-28 20:38:39 UTC
  • Revision ID: zulcss@ubuntu.com-20100928203839-pgjulytsi9ue63x1
Initial version

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#!/usr/bin/python
 
2
 
 
3
# Unix SMB/CIFS implementation.
 
4
# Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2005-2008
 
5
# Copyright (C) Martin Kuehl <mkhl@samba.org> 2006
 
6
#
 
7
# This is a Python port of the original in testprogs/ejs/samba3sam.js
 
8
#   
 
9
# This program is free software; you can redistribute it and/or modify
 
10
# it under the terms of the GNU General Public License as published by
 
11
# the Free Software Foundation; either version 3 of the License, or
 
12
# (at your option) any later version.
 
13
#   
 
14
# This program is distributed in the hope that it will be useful,
 
15
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
16
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
17
# GNU General Public License for more details.
 
18
#   
 
19
# You should have received a copy of the GNU General Public License
 
20
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
21
#
 
22
 
 
23
"""Tests for the samba3sam LDB module, which maps Samba3 LDAP to AD LDAP."""
 
24
 
 
25
import os
 
26
import ldb
 
27
from ldb import SCOPE_DEFAULT, SCOPE_BASE, SCOPE_SUBTREE
 
28
from samba import Ldb, substitute_var
 
29
from samba.tests import LdbTestCase, TestCaseInTempDir, cmdline_loadparm
 
30
import samba.dcerpc.security
 
31
import samba.ndr
 
32
 
 
33
datadir = os.path.join(os.path.dirname(__file__), 
 
34
                       "../../../../../testdata/samba3")
 
35
 
 
36
def read_datafile(filename):
 
37
    return open(os.path.join(datadir, filename), 'r').read()
 
38
 
 
39
def ldb_debug(l, text):
 
40
    print text
 
41
 
 
42
 
 
43
class MapBaseTestCase(TestCaseInTempDir):
 
44
    """Base test case for mapping tests."""
 
45
 
 
46
    def setup_modules(self, ldb, s3, s4):
 
47
        ldb.add({"dn": "@MAP=samba3sam",
 
48
                 "@FROM": s4.basedn,
 
49
                 "@TO": "sambaDomainName=TESTS," + s3.basedn})
 
50
 
 
51
        ldb.add({"dn": "@MODULES",
 
52
                 "@LIST": "rootdse,paged_results,server_sort,asq,samldb,password_hash,operational,objectguid,rdn_name,samba3sam,partition"})
 
53
 
 
54
        ldb.add({"dn": "@PARTITION",
 
55
            "partition": ["%s:%s" % (s4.basedn, s4.url), 
 
56
                          "%s:%s" % (s3.basedn, s3.url)],
 
57
            "replicateEntries": ["@ATTRIBUTES", "@INDEXLIST"]})
 
58
 
 
59
    def setUp(self):
 
60
        super(MapBaseTestCase, self).setUp()
 
61
 
 
62
        def make_dn(basedn, rdn):
 
63
            return "%s,sambaDomainName=TESTS,%s" % (rdn, basedn)
 
64
 
 
65
        def make_s4dn(basedn, rdn):
 
66
            return "%s,%s" % (rdn, basedn)
 
67
 
 
68
        self.ldbfile = os.path.join(self.tempdir, "test.ldb")
 
69
        self.ldburl = "tdb://" + self.ldbfile
 
70
 
 
71
        tempdir = self.tempdir
 
72
 
 
73
        class Target:
 
74
            """Simple helper class that contains data for a specific SAM 
 
75
            connection."""
 
76
            def __init__(self, file, basedn, dn):
 
77
                self.file = os.path.join(tempdir, file)
 
78
                self.url = "tdb://" + self.file
 
79
                self.basedn = basedn
 
80
                self.substvars = {"BASEDN": self.basedn}
 
81
                self.db = Ldb(lp=cmdline_loadparm)
 
82
                self._dn = dn
 
83
 
 
84
            def dn(self, rdn):
 
85
                return self._dn(self.basedn, rdn)
 
86
 
 
87
            def connect(self):
 
88
                return self.db.connect(self.url)
 
89
 
 
90
            def setup_data(self, path):
 
91
                self.add_ldif(read_datafile(path))
 
92
 
 
93
            def subst(self, text):
 
94
                return substitute_var(text, self.substvars)
 
95
 
 
96
            def add_ldif(self, ldif):
 
97
                self.db.add_ldif(self.subst(ldif))
 
98
 
 
99
            def modify_ldif(self, ldif):
 
100
                self.db.modify_ldif(self.subst(ldif))
 
101
 
 
102
        self.samba4 = Target("samba4.ldb", "dc=vernstok,dc=nl", make_s4dn)
 
103
        self.samba3 = Target("samba3.ldb", "cn=Samba3Sam", make_dn)
 
104
        self.templates = Target("templates.ldb", "cn=templates", None)
 
105
 
 
106
        self.samba3.connect()
 
107
        self.templates.connect()
 
108
        self.samba4.connect()
 
109
 
 
110
    def tearDown(self):
 
111
        os.unlink(self.ldbfile)
 
112
        os.unlink(self.samba3.file)
 
113
        os.unlink(self.templates.file)
 
114
        os.unlink(self.samba4.file)
 
115
        super(MapBaseTestCase, self).tearDown()
 
116
 
 
117
    def assertSidEquals(self, text, ndr_sid):
 
118
        sid_obj1 = samba.ndr.ndr_unpack(samba.dcerpc.security.dom_sid,
 
119
                                        str(ndr_sid[0]))
 
120
        sid_obj2 = samba.dcerpc.security.dom_sid(text)
 
121
        self.assertEquals(sid_obj1, sid_obj2)
 
122
 
 
123
 
 
124
class Samba3SamTestCase(MapBaseTestCase):
 
125
 
 
126
    def setUp(self):
 
127
        super(Samba3SamTestCase, self).setUp()
 
128
        ldb = Ldb(self.ldburl, lp=cmdline_loadparm)
 
129
        self.samba3.setup_data("samba3.ldif")
 
130
        self.templates.setup_data("provision_samba3sam_templates.ldif")
 
131
        ldif = read_datafile("provision_samba3sam.ldif")
 
132
        ldb.add_ldif(self.samba4.subst(ldif))
 
133
        self.setup_modules(ldb, self.samba3, self.samba4)
 
134
        del ldb
 
135
        self.ldb = Ldb(self.ldburl, lp=cmdline_loadparm)
 
136
 
 
137
    def test_search_non_mapped(self):
 
138
        """Looking up by non-mapped attribute"""
 
139
        msg = self.ldb.search(expression="(cn=Administrator)")
 
140
        self.assertEquals(len(msg), 1)
 
141
        self.assertEquals(msg[0]["cn"], "Administrator")
 
142
 
 
143
    def test_search_non_mapped(self):
 
144
        """Looking up by mapped attribute"""
 
145
        msg = self.ldb.search(expression="(name=Backup Operators)")
 
146
        self.assertEquals(len(msg), 1)
 
147
        self.assertEquals(str(msg[0]["name"]), "Backup Operators")
 
148
 
 
149
    def test_old_name_of_renamed(self):
 
150
        """Looking up by old name of renamed attribute"""
 
151
        msg = self.ldb.search(expression="(displayName=Backup Operators)")
 
152
        self.assertEquals(len(msg), 0)
 
153
 
 
154
    def test_mapped_containing_sid(self):
 
155
        """Looking up mapped entry containing SID"""
 
156
        msg = self.ldb.search(expression="(cn=Replicator)")
 
157
        self.assertEquals(len(msg), 1)
 
158
        self.assertEquals(str(msg[0].dn), 
 
159
                          "cn=Replicator,ou=Groups,dc=vernstok,dc=nl")
 
160
        self.assertTrue("objectSid" in msg[0]) 
 
161
        self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-552",
 
162
                             msg[0]["objectSid"])
 
163
        oc = set(msg[0]["objectClass"])
 
164
        self.assertEquals(oc, set(["group"]))
 
165
 
 
166
    def test_search_by_objclass(self):
 
167
        """Looking up by objectClass"""
 
168
        msg = self.ldb.search(expression="(|(objectClass=user)(cn=Administrator))")
 
169
        self.assertEquals(set([str(m.dn) for m in msg]), 
 
170
                set(["unixName=Administrator,ou=Users,dc=vernstok,dc=nl", 
 
171
                     "unixName=nobody,ou=Users,dc=vernstok,dc=nl"]))
 
172
 
 
173
    def test_s3sam_modify(self):
 
174
        # Adding a record that will be fallbacked
 
175
        self.ldb.add({"dn": "cn=Foo", 
 
176
            "foo": "bar", 
 
177
            "blah": "Blie", 
 
178
            "cn": "Foo", 
 
179
            "showInAdvancedViewOnly": "TRUE"}
 
180
            )
 
181
 
 
182
        # Checking for existence of record (local)
 
183
        # TODO: This record must be searched in the local database, which is 
 
184
        # currently only supported for base searches
 
185
        # msg = ldb.search(expression="(cn=Foo)", ['foo','blah','cn','showInAdvancedViewOnly')]
 
186
        # TODO: Actually, this version should work as well but doesn't...
 
187
        # 
 
188
        #    
 
189
        msg = self.ldb.search(expression="(cn=Foo)", base="cn=Foo", 
 
190
                scope=SCOPE_BASE, 
 
191
                attrs=['foo','blah','cn','showInAdvancedViewOnly'])
 
192
        self.assertEquals(len(msg), 1)
 
193
        self.assertEquals(str(msg[0]["showInAdvancedViewOnly"]), "TRUE")
 
194
        self.assertEquals(str(msg[0]["foo"]), "bar")
 
195
        self.assertEquals(str(msg[0]["blah"]), "Blie")
 
196
 
 
197
        # Adding record that will be mapped
 
198
        self.ldb.add({"dn": "cn=Niemand,cn=Users,dc=vernstok,dc=nl",
 
199
                 "objectClass": "user",
 
200
                 "unixName": "bin",
 
201
                 "sambaUnicodePwd": "geheim",
 
202
                 "cn": "Niemand"})
 
203
 
 
204
        # Checking for existence of record (remote)
 
205
        msg = self.ldb.search(expression="(unixName=bin)", 
 
206
                              attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
 
207
        self.assertEquals(len(msg), 1)
 
208
        self.assertEquals(str(msg[0]["cn"]), "Niemand")
 
209
        self.assertEquals(str(msg[0]["sambaUnicodePwd"]), "geheim")
 
210
 
 
211
        # Checking for existence of record (local && remote)
 
212
        msg = self.ldb.search(expression="(&(unixName=bin)(sambaUnicodePwd=geheim))", 
 
213
                         attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
 
214
        self.assertEquals(len(msg), 1)           # TODO: should check with more records
 
215
        self.assertEquals(str(msg[0]["cn"]), "Niemand")
 
216
        self.assertEquals(str(msg[0]["unixName"]), "bin")
 
217
        self.assertEquals(str(msg[0]["sambaUnicodePwd"]), "geheim")
 
218
 
 
219
        # Checking for existence of record (local || remote)
 
220
        msg = self.ldb.search(expression="(|(unixName=bin)(sambaUnicodePwd=geheim))", 
 
221
                         attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
 
222
        #print "got %d replies" % len(msg)
 
223
        self.assertEquals(len(msg), 1)        # TODO: should check with more records
 
224
        self.assertEquals(str(msg[0]["cn"]), "Niemand")
 
225
        self.assertEquals(str(msg[0]["unixName"]), "bin")
 
226
        self.assertEquals(str(msg[0]["sambaUnicodePwd"]), "geheim")
 
227
 
 
228
        # Checking for data in destination database
 
229
        msg = self.samba3.db.search(expression="(cn=Niemand)")
 
230
        self.assertTrue(len(msg) >= 1)
 
231
        self.assertEquals(str(msg[0]["sambaSID"]), 
 
232
                "S-1-5-21-4231626423-2410014848-2360679739-2001")
 
233
        self.assertEquals(str(msg[0]["displayName"]), "Niemand")
 
234
 
 
235
        # Adding attribute...
 
236
        self.ldb.modify_ldif("""
 
237
dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
 
238
changetype: modify
 
239
add: description
 
240
description: Blah
 
241
""")
 
242
 
 
243
        # Checking whether changes are still there...
 
244
        msg = self.ldb.search(expression="(cn=Niemand)")
 
245
        self.assertTrue(len(msg) >= 1)
 
246
        self.assertEquals(str(msg[0]["cn"]), "Niemand")
 
247
        self.assertEquals(str(msg[0]["description"]), "Blah")
 
248
 
 
249
        # Modifying attribute...
 
250
        self.ldb.modify_ldif("""
 
251
dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
 
252
changetype: modify
 
253
replace: description
 
254
description: Blie
 
255
""")
 
256
 
 
257
        # Checking whether changes are still there...
 
258
        msg = self.ldb.search(expression="(cn=Niemand)")
 
259
        self.assertTrue(len(msg) >= 1)
 
260
        self.assertEquals(str(msg[0]["description"]), "Blie")
 
261
 
 
262
        # Deleting attribute...
 
263
        self.ldb.modify_ldif("""
 
264
dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
 
265
changetype: modify
 
266
delete: description
 
267
""")
 
268
 
 
269
        # Checking whether changes are no longer there...
 
270
        msg = self.ldb.search(expression="(cn=Niemand)")
 
271
        self.assertTrue(len(msg) >= 1)
 
272
        self.assertTrue(not "description" in msg[0])
 
273
 
 
274
        # Renaming record...
 
275
        self.ldb.rename("cn=Niemand,cn=Users,dc=vernstok,dc=nl", 
 
276
                        "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
 
277
 
 
278
        # Checking whether DN has changed...
 
279
        msg = self.ldb.search(expression="(cn=Niemand2)")
 
280
        self.assertEquals(len(msg), 1)
 
281
        self.assertEquals(str(msg[0].dn), 
 
282
                          "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
 
283
 
 
284
        # Deleting record...
 
285
        self.ldb.delete("cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
 
286
 
 
287
        # Checking whether record is gone...
 
288
        msg = self.ldb.search(expression="(cn=Niemand2)")
 
289
        self.assertEquals(len(msg), 0)
 
290
 
 
291
 
 
292
class MapTestCase(MapBaseTestCase):
 
293
 
 
294
    def setUp(self):
 
295
        super(MapTestCase, self).setUp()
 
296
        ldb = Ldb(self.ldburl, lp=cmdline_loadparm)
 
297
        self.templates.setup_data("provision_samba3sam_templates.ldif")
 
298
        ldif = read_datafile("provision_samba3sam.ldif")
 
299
        ldb.add_ldif(self.samba4.subst(ldif))
 
300
        self.setup_modules(ldb, self.samba3, self.samba4)
 
301
        del ldb
 
302
        self.ldb = Ldb(self.ldburl, lp=cmdline_loadparm)
 
303
 
 
304
    def test_map_search(self):
 
305
        """Running search tests on mapped data."""
 
306
        self.samba3.db.add({
 
307
            "dn": "sambaDomainName=TESTS," + self.samba3.basedn,
 
308
            "objectclass": ["sambaDomain", "top"],
 
309
            "sambaSID": "S-1-5-21-4231626423-2410014848-2360679739",
 
310
            "sambaNextRid": "2000",
 
311
            "sambaDomainName": "TESTS"
 
312
            })
 
313
 
 
314
        # Add a set of split records
 
315
        self.ldb.add_ldif("""
 
316
dn: """+ self.samba4.dn("cn=X") + """
 
317
objectClass: user
 
318
cn: X
 
319
codePage: x
 
320
revision: x
 
321
dnsHostName: x
 
322
nextRid: y
 
323
lastLogon: x
 
324
description: x
 
325
objectSid: S-1-5-21-4231626423-2410014848-2360679739-552
 
326
primaryGroupID: 1-5-21-4231626423-2410014848-2360679739-512
 
327
 
 
328
""")
 
329
 
 
330
        self.ldb.add({
 
331
            "dn": self.samba4.dn("cn=Y"),
 
332
            "objectClass": "top",
 
333
            "cn": "Y",
 
334
            "codePage": "x",
 
335
            "revision": "x",
 
336
            "dnsHostName": "y",
 
337
            "nextRid": "y",
 
338
            "lastLogon": "y",
 
339
            "description": "x"})
 
340
 
 
341
        self.ldb.add({
 
342
            "dn": self.samba4.dn("cn=Z"),
 
343
            "objectClass": "top",
 
344
            "cn": "Z",
 
345
            "codePage": "x",
 
346
            "revision": "y",
 
347
            "dnsHostName": "z",
 
348
            "nextRid": "y",
 
349
            "lastLogon": "z",
 
350
            "description": "y"})
 
351
 
 
352
        # Add a set of remote records
 
353
 
 
354
        self.samba3.db.add({
 
355
            "dn": self.samba3.dn("cn=A"),
 
356
            "objectClass": "posixAccount",
 
357
            "cn": "A",
 
358
            "sambaNextRid": "x",
 
359
            "sambaBadPasswordCount": "x", 
 
360
            "sambaLogonTime": "x",
 
361
            "description": "x",
 
362
            "sambaSID": "S-1-5-21-4231626423-2410014848-2360679739-552",
 
363
            "sambaPrimaryGroupSID": "S-1-5-21-4231626423-2410014848-2360679739-512"})
 
364
 
 
365
        self.samba3.db.add({
 
366
            "dn": self.samba3.dn("cn=B"),
 
367
            "objectClass": "top",
 
368
            "cn": "B",
 
369
            "sambaNextRid": "x",
 
370
            "sambaBadPasswordCount": "x",
 
371
            "sambaLogonTime": "y",
 
372
            "description": "x"})
 
373
 
 
374
        self.samba3.db.add({
 
375
            "dn": self.samba3.dn("cn=C"),
 
376
            "objectClass": "top",
 
377
            "cn": "C",
 
378
            "sambaNextRid": "x",
 
379
            "sambaBadPasswordCount": "y",
 
380
            "sambaLogonTime": "z",
 
381
            "description": "y"})
 
382
 
 
383
        # Testing search by DN
 
384
 
 
385
        # Search remote record by local DN
 
386
        dn = self.samba4.dn("cn=A")
 
387
        res = self.ldb.search(dn, scope=SCOPE_BASE, 
 
388
                attrs=["dnsHostName", "lastLogon"])
 
389
        self.assertEquals(len(res), 1)
 
390
        self.assertEquals(str(res[0].dn), dn)
 
391
        self.assertTrue(not "dnsHostName" in res[0])
 
392
        self.assertEquals(str(res[0]["lastLogon"]), "x")
 
393
 
 
394
        # Search remote record by remote DN
 
395
        dn = self.samba3.dn("cn=A")
 
396
        res = self.samba3.db.search(dn, scope=SCOPE_BASE, 
 
397
                attrs=["dnsHostName", "lastLogon", "sambaLogonTime"])
 
398
        self.assertEquals(len(res), 1)
 
399
        self.assertEquals(str(res[0].dn), dn)
 
400
        self.assertTrue(not "dnsHostName" in res[0])
 
401
        self.assertTrue(not "lastLogon" in res[0])
 
402
        self.assertEquals(str(res[0]["sambaLogonTime"]), "x")
 
403
 
 
404
        # Search split record by local DN
 
405
        dn = self.samba4.dn("cn=X")
 
406
        res = self.ldb.search(dn, scope=SCOPE_BASE, 
 
407
                attrs=["dnsHostName", "lastLogon"])
 
408
        self.assertEquals(len(res), 1)
 
409
        self.assertEquals(str(res[0].dn), dn)
 
410
        self.assertEquals(str(res[0]["dnsHostName"]), "x")
 
411
        self.assertEquals(str(res[0]["lastLogon"]), "x")
 
412
 
 
413
        # Search split record by remote DN
 
414
        dn = self.samba3.dn("cn=X")
 
415
        res = self.samba3.db.search(dn, scope=SCOPE_BASE, 
 
416
                attrs=["dnsHostName", "lastLogon", "sambaLogonTime"])
 
417
        self.assertEquals(len(res), 1)
 
418
        self.assertEquals(str(res[0].dn), dn)
 
419
        self.assertTrue(not "dnsHostName" in res[0])
 
420
        self.assertTrue(not "lastLogon" in res[0])
 
421
        self.assertEquals(str(res[0]["sambaLogonTime"]), "x")
 
422
 
 
423
        # Testing search by attribute
 
424
 
 
425
        # Search by ignored attribute
 
426
        res = self.ldb.search(expression="(revision=x)", scope=SCOPE_DEFAULT, 
 
427
                attrs=["dnsHostName", "lastLogon"])
 
428
        self.assertEquals(len(res), 2)
 
429
        self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
 
430
        self.assertEquals(str(res[0]["dnsHostName"]), "y")
 
431
        self.assertEquals(str(res[0]["lastLogon"]), "y")
 
432
        self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
 
433
        self.assertEquals(str(res[1]["dnsHostName"]), "x")
 
434
        self.assertEquals(str(res[1]["lastLogon"]), "x")
 
435
 
 
436
        # Search by kept attribute
 
437
        res = self.ldb.search(expression="(description=y)", 
 
438
                scope=SCOPE_DEFAULT, attrs=["dnsHostName", "lastLogon"])
 
439
        self.assertEquals(len(res), 2)
 
440
        self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Z"))
 
441
        self.assertEquals(str(res[0]["dnsHostName"]), "z")
 
442
        self.assertEquals(str(res[0]["lastLogon"]), "z")
 
443
        self.assertEquals(str(res[1].dn), self.samba4.dn("cn=C"))
 
444
        self.assertTrue(not "dnsHostName" in res[1])
 
445
        self.assertEquals(str(res[1]["lastLogon"]), "z")
 
446
 
 
447
        # Search by renamed attribute
 
448
        res = self.ldb.search(expression="(badPwdCount=x)", scope=SCOPE_DEFAULT,
 
449
                              attrs=["dnsHostName", "lastLogon"])
 
450
        self.assertEquals(len(res), 2)
 
451
        self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
 
452
        self.assertTrue(not "dnsHostName" in res[0])
 
453
        self.assertEquals(str(res[0]["lastLogon"]), "y")
 
454
        self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
 
455
        self.assertTrue(not "dnsHostName" in res[1])
 
456
        self.assertEquals(str(res[1]["lastLogon"]), "x")
 
457
 
 
458
        # Search by converted attribute
 
459
        # TODO:
 
460
        #   Using the SID directly in the parse tree leads to conversion
 
461
        #   errors, letting the search fail with no results.
 
462
        #res = self.ldb.search("(objectSid=S-1-5-21-4231626423-2410014848-2360679739-552)", scope=SCOPE_DEFAULT, attrs)
 
463
        res = self.ldb.search(expression="(objectSid=*)", base=None, scope=SCOPE_DEFAULT, attrs=["dnsHostName", "lastLogon", "objectSid"])
 
464
        self.assertEquals(len(res), 3)
 
465
        self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
 
466
        self.assertEquals(str(res[0]["dnsHostName"]), "x")
 
467
        self.assertEquals(str(res[0]["lastLogon"]), "x")
 
468
        self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-552", 
 
469
                             res[0]["objectSid"])
 
470
        self.assertTrue("objectSid" in res[0])
 
471
        self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
 
472
        self.assertTrue(not "dnsHostName" in res[1])
 
473
        self.assertEquals(str(res[1]["lastLogon"]), "x")
 
474
        self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-552",
 
475
                             res[1]["objectSid"])
 
476
        self.assertTrue("objectSid" in res[1])
 
477
 
 
478
        # Search by generated attribute 
 
479
        # In most cases, this even works when the mapping is missing
 
480
        # a `convert_operator' by enumerating the remote db.
 
481
        res = self.ldb.search(expression="(primaryGroupID=512)", 
 
482
                           attrs=["dnsHostName", "lastLogon", "primaryGroupID"])
 
483
        self.assertEquals(len(res), 1)
 
484
        self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
 
485
        self.assertTrue(not "dnsHostName" in res[0])
 
486
        self.assertEquals(str(res[0]["lastLogon"]), "x")
 
487
        self.assertEquals(str(res[0]["primaryGroupID"]), "512")
 
488
 
 
489
        # TODO: There should actually be two results, A and X.  The
 
490
        # primaryGroupID of X seems to get corrupted somewhere, and the
 
491
        # objectSid isn't available during the generation of remote (!) data,
 
492
        # which can be observed with the following search.  Also note that Xs
 
493
        # objectSid seems to be fine in the previous search for objectSid... */
 
494
        #res = ldb.search(expression="(primaryGroupID=*)", NULL, ldb. SCOPE_DEFAULT, attrs)
 
495
        #print len(res) + " results found"
 
496
        #for i in range(len(res)):
 
497
        #    for (obj in res[i]) {
 
498
        #        print obj + ": " + res[i][obj]
 
499
        #    }
 
500
        #    print "---"
 
501
        #    
 
502
 
 
503
        # Search by remote name of renamed attribute */
 
504
        res = self.ldb.search(expression="(sambaBadPasswordCount=*)", 
 
505
                              attrs=["dnsHostName", "lastLogon"])
 
506
        self.assertEquals(len(res), 0)
 
507
 
 
508
        # Search by objectClass
 
509
        attrs = ["dnsHostName", "lastLogon", "objectClass"]
 
510
        res = self.ldb.search(expression="(objectClass=user)", attrs=attrs)
 
511
        self.assertEquals(len(res), 2)
 
512
        self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
 
513
        self.assertEquals(str(res[0]["dnsHostName"]), "x")
 
514
        self.assertEquals(str(res[0]["lastLogon"]), "x")
 
515
        self.assertEquals(str(res[0]["objectClass"][0]), "user")
 
516
        self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
 
517
        self.assertTrue(not "dnsHostName" in res[1])
 
518
        self.assertEquals(str(res[1]["lastLogon"]), "x")
 
519
        self.assertEquals(str(res[1]["objectClass"][0]), "user")
 
520
 
 
521
        # Prove that the objectClass is actually used for the search
 
522
        res = self.ldb.search(expression="(|(objectClass=user)(badPwdCount=x))",
 
523
                              attrs=attrs)
 
524
        self.assertEquals(len(res), 3)
 
525
        self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
 
526
        self.assertTrue(not "dnsHostName" in res[0])
 
527
        self.assertEquals(str(res[0]["lastLogon"]), "y")
 
528
        self.assertEquals(set(res[0]["objectClass"]), set(["top"]))
 
529
        self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
 
530
        self.assertEquals(str(res[1]["dnsHostName"]), "x")
 
531
        self.assertEquals(str(res[1]["lastLogon"]), "x")
 
532
        self.assertEquals(str(res[1]["objectClass"][0]), "user")
 
533
        self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A"))
 
534
        self.assertTrue(not "dnsHostName" in res[2])
 
535
        self.assertEquals(str(res[2]["lastLogon"]), "x")
 
536
        self.assertEquals(res[2]["objectClass"][0], "user")
 
537
 
 
538
        # Testing search by parse tree
 
539
 
 
540
        # Search by conjunction of local attributes
 
541
        res = self.ldb.search(expression="(&(codePage=x)(revision=x))", 
 
542
                              attrs=["dnsHostName", "lastLogon"])
 
543
        self.assertEquals(len(res), 2)
 
544
        self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
 
545
        self.assertEquals(str(res[0]["dnsHostName"]), "y")
 
546
        self.assertEquals(str(res[0]["lastLogon"]), "y")
 
547
        self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
 
548
        self.assertEquals(str(res[1]["dnsHostName"]), "x")
 
549
        self.assertEquals(str(res[1]["lastLogon"]), "x")
 
550
 
 
551
        # Search by conjunction of remote attributes
 
552
        res = self.ldb.search(expression="(&(lastLogon=x)(description=x))", 
 
553
                              attrs=["dnsHostName", "lastLogon"])
 
554
        self.assertEquals(len(res), 2)
 
555
        self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
 
556
        self.assertEquals(str(res[0]["dnsHostName"]), "x")
 
557
        self.assertEquals(str(res[0]["lastLogon"]), "x")
 
558
        self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
 
559
        self.assertTrue(not "dnsHostName" in res[1])
 
560
        self.assertEquals(str(res[1]["lastLogon"]), "x")
 
561
        
 
562
        # Search by conjunction of local and remote attribute 
 
563
        res = self.ldb.search(expression="(&(codePage=x)(description=x))", 
 
564
                              attrs=["dnsHostName", "lastLogon"])
 
565
        self.assertEquals(len(res), 2)
 
566
        self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
 
567
        self.assertEquals(str(res[0]["dnsHostName"]), "y")
 
568
        self.assertEquals(str(res[0]["lastLogon"]), "y")
 
569
        self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
 
570
        self.assertEquals(str(res[1]["dnsHostName"]), "x")
 
571
        self.assertEquals(str(res[1]["lastLogon"]), "x")
 
572
 
 
573
        # Search by conjunction of local and remote attribute w/o match
 
574
        attrs = ["dnsHostName", "lastLogon"]
 
575
        res = self.ldb.search(expression="(&(codePage=x)(nextRid=x))", 
 
576
                              attrs=attrs)
 
577
        self.assertEquals(len(res), 0)
 
578
        res = self.ldb.search(expression="(&(revision=x)(lastLogon=z))", 
 
579
                              attrs=attrs)
 
580
        self.assertEquals(len(res), 0)
 
581
 
 
582
        # Search by disjunction of local attributes
 
583
        res = self.ldb.search(expression="(|(revision=x)(dnsHostName=x))", 
 
584
                              attrs=["dnsHostName", "lastLogon"])
 
585
        self.assertEquals(len(res), 2)
 
586
        self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
 
587
        self.assertEquals(str(res[0]["dnsHostName"]), "y")
 
588
        self.assertEquals(str(res[0]["lastLogon"]), "y")
 
589
        self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
 
590
        self.assertEquals(str(res[1]["dnsHostName"]), "x")
 
591
        self.assertEquals(str(res[1]["lastLogon"]), "x")
 
592
 
 
593
        # Search by disjunction of remote attributes
 
594
        res = self.ldb.search(expression="(|(badPwdCount=x)(lastLogon=x))", 
 
595
                              attrs=["dnsHostName", "lastLogon"])
 
596
        self.assertEquals(len(res), 3)
 
597
        self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
 
598
        self.assertFalse("dnsHostName" in res[0])
 
599
        self.assertEquals(str(res[0]["lastLogon"]), "y")
 
600
        self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
 
601
        self.assertEquals(str(res[1]["dnsHostName"]), "x")
 
602
        self.assertEquals(str(res[1]["lastLogon"]), "x")
 
603
        self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A"))
 
604
        self.assertFalse("dnsHostName" in res[2])
 
605
        self.assertEquals(str(res[2]["lastLogon"]), "x")
 
606
 
 
607
        # Search by disjunction of local and remote attribute
 
608
        res = self.ldb.search(expression="(|(revision=x)(lastLogon=y))", 
 
609
                              attrs=["dnsHostName", "lastLogon"])
 
610
        self.assertEquals(len(res), 3)
 
611
        self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
 
612
        self.assertEquals(str(res[0]["dnsHostName"]), "y")
 
613
        self.assertEquals(str(res[0]["lastLogon"]), "y")
 
614
        self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
 
615
        self.assertFalse("dnsHostName" in res[1])
 
616
        self.assertEquals(str(res[1]["lastLogon"]), "y")
 
617
        self.assertEquals(str(res[2].dn), self.samba4.dn("cn=X"))
 
618
        self.assertEquals(str(res[2]["dnsHostName"]), "x")
 
619
        self.assertEquals(str(res[2]["lastLogon"]), "x")
 
620
 
 
621
        # Search by disjunction of local and remote attribute w/o match
 
622
        res = self.ldb.search(expression="(|(codePage=y)(nextRid=z))", 
 
623
                              attrs=["dnsHostName", "lastLogon"])
 
624
        self.assertEquals(len(res), 0)
 
625
 
 
626
        # Search by negated local attribute
 
627
        res = self.ldb.search(expression="(!(revision=x))", 
 
628
                              attrs=["dnsHostName", "lastLogon"])
 
629
        self.assertEquals(len(res), 5)
 
630
        self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
 
631
        self.assertTrue(not "dnsHostName" in res[0])
 
632
        self.assertEquals(str(res[0]["lastLogon"]), "y")
 
633
        self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
 
634
        self.assertTrue(not "dnsHostName" in res[1])
 
635
        self.assertEquals(str(res[1]["lastLogon"]), "x")
 
636
        self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
 
637
        self.assertEquals(str(res[2]["dnsHostName"]), "z")
 
638
        self.assertEquals(str(res[2]["lastLogon"]), "z")
 
639
        self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
 
640
        self.assertTrue(not "dnsHostName" in res[3])
 
641
        self.assertEquals(str(res[3]["lastLogon"]), "z")
 
642
 
 
643
        # Search by negated remote attribute
 
644
        res = self.ldb.search(expression="(!(description=x))", 
 
645
                              attrs=["dnsHostName", "lastLogon"])
 
646
        self.assertEquals(len(res), 3)
 
647
        self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Z"))
 
648
        self.assertEquals(str(res[0]["dnsHostName"]), "z")
 
649
        self.assertEquals(str(res[0]["lastLogon"]), "z")
 
650
        self.assertEquals(str(res[1].dn), self.samba4.dn("cn=C"))
 
651
        self.assertTrue(not "dnsHostName" in res[1])
 
652
        self.assertEquals(str(res[1]["lastLogon"]), "z")
 
653
 
 
654
        # Search by negated conjunction of local attributes
 
655
        res = self.ldb.search(expression="(!(&(codePage=x)(revision=x)))", 
 
656
                              attrs=["dnsHostName", "lastLogon"])
 
657
        self.assertEquals(len(res), 5)
 
658
        self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
 
659
        self.assertTrue(not "dnsHostName" in res[0])
 
660
        self.assertEquals(str(res[0]["lastLogon"]), "y")
 
661
        self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
 
662
        self.assertTrue(not "dnsHostName" in res[1])
 
663
        self.assertEquals(str(res[1]["lastLogon"]), "x")
 
664
        self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
 
665
        self.assertEquals(str(res[2]["dnsHostName"]), "z")
 
666
        self.assertEquals(str(res[2]["lastLogon"]), "z")
 
667
        self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
 
668
        self.assertTrue(not "dnsHostName" in res[3])
 
669
        self.assertEquals(str(res[3]["lastLogon"]), "z")
 
670
 
 
671
        # Search by negated conjunction of remote attributes
 
672
        res = self.ldb.search(expression="(!(&(lastLogon=x)(description=x)))", 
 
673
                              attrs=["dnsHostName", "lastLogon"])
 
674
        self.assertEquals(len(res), 5)
 
675
        self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
 
676
        self.assertEquals(str(res[0]["dnsHostName"]), "y")
 
677
        self.assertEquals(str(res[0]["lastLogon"]), "y")
 
678
        self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
 
679
        self.assertTrue(not "dnsHostName" in res[1])
 
680
        self.assertEquals(str(res[1]["lastLogon"]), "y")
 
681
        self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
 
682
        self.assertEquals(str(res[2]["dnsHostName"]), "z")
 
683
        self.assertEquals(str(res[2]["lastLogon"]), "z")
 
684
        self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
 
685
        self.assertTrue(not "dnsHostName" in res[3])
 
686
        self.assertEquals(str(res[3]["lastLogon"]), "z")
 
687
 
 
688
        # Search by negated conjunction of local and remote attribute
 
689
        res = self.ldb.search(expression="(!(&(codePage=x)(description=x)))", 
 
690
                              attrs=["dnsHostName", "lastLogon"])
 
691
        self.assertEquals(len(res), 5)
 
692
        self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
 
693
        self.assertTrue(not "dnsHostName" in res[0])
 
694
        self.assertEquals(str(res[0]["lastLogon"]), "y")
 
695
        self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
 
696
        self.assertTrue(not "dnsHostName" in res[1])
 
697
        self.assertEquals(str(res[1]["lastLogon"]), "x")
 
698
        self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
 
699
        self.assertEquals(str(res[2]["dnsHostName"]), "z")
 
700
        self.assertEquals(str(res[2]["lastLogon"]), "z")
 
701
        self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
 
702
        self.assertTrue(not "dnsHostName" in res[3])
 
703
        self.assertEquals(str(res[3]["lastLogon"]), "z")
 
704
 
 
705
        # Search by negated disjunction of local attributes
 
706
        res = self.ldb.search(expression="(!(|(revision=x)(dnsHostName=x)))", 
 
707
                              attrs=["dnsHostName", "lastLogon"])
 
708
        self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
 
709
        self.assertTrue(not "dnsHostName" in res[0])
 
710
        self.assertEquals(str(res[0]["lastLogon"]), "y")
 
711
        self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
 
712
        self.assertTrue(not "dnsHostName" in res[1])
 
713
        self.assertEquals(str(res[1]["lastLogon"]), "x")
 
714
        self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
 
715
        self.assertEquals(str(res[2]["dnsHostName"]), "z")
 
716
        self.assertEquals(str(res[2]["lastLogon"]), "z")
 
717
        self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
 
718
        self.assertTrue(not "dnsHostName" in res[3])
 
719
        self.assertEquals(str(res[3]["lastLogon"]), "z")
 
720
 
 
721
        # Search by negated disjunction of remote attributes
 
722
        res = self.ldb.search(expression="(!(|(badPwdCount=x)(lastLogon=x)))", 
 
723
                              attrs=["dnsHostName", "lastLogon"])
 
724
        self.assertEquals(len(res), 4)
 
725
        self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
 
726
        self.assertEquals(str(res[0]["dnsHostName"]), "y")
 
727
        self.assertEquals(str(res[0]["lastLogon"]), "y")
 
728
        self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Z"))
 
729
        self.assertEquals(str(res[1]["dnsHostName"]), "z")
 
730
        self.assertEquals(str(res[1]["lastLogon"]), "z")
 
731
        self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
 
732
        self.assertTrue(not "dnsHostName" in res[2])
 
733
        self.assertEquals(str(res[2]["lastLogon"]), "z")
 
734
 
 
735
        # Search by negated disjunction of local and remote attribute
 
736
        res = self.ldb.search(expression="(!(|(revision=x)(lastLogon=y)))", 
 
737
                              attrs=["dnsHostName", "lastLogon"])
 
738
        self.assertEquals(len(res), 4)
 
739
        self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
 
740
        self.assertTrue(not "dnsHostName" in res[0])
 
741
        self.assertEquals(str(res[0]["lastLogon"]), "x")
 
742
        self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Z"))
 
743
        self.assertEquals(str(res[1]["dnsHostName"]), "z")
 
744
        self.assertEquals(str(res[1]["lastLogon"]), "z")
 
745
        self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
 
746
        self.assertTrue(not "dnsHostName" in res[2])
 
747
        self.assertEquals(str(res[2]["lastLogon"]), "z")
 
748
 
 
749
        # Search by complex parse tree
 
750
        res = self.ldb.search(expression="(|(&(revision=x)(dnsHostName=x))(!(&(description=x)(nextRid=y)))(badPwdCount=y))", attrs=["dnsHostName", "lastLogon"])
 
751
        self.assertEquals(len(res), 6)
 
752
        self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
 
753
        self.assertTrue(not "dnsHostName" in res[0])
 
754
        self.assertEquals(str(res[0]["lastLogon"]), "y")
 
755
        self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
 
756
        self.assertEquals(str(res[1]["dnsHostName"]), "x")
 
757
        self.assertEquals(str(res[1]["lastLogon"]), "x")
 
758
        self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A"))
 
759
        self.assertTrue(not "dnsHostName" in res[2])
 
760
        self.assertEquals(str(res[2]["lastLogon"]), "x")
 
761
        self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z"))
 
762
        self.assertEquals(str(res[3]["dnsHostName"]), "z")
 
763
        self.assertEquals(str(res[3]["lastLogon"]), "z")
 
764
        self.assertEquals(str(res[4].dn), self.samba4.dn("cn=C"))
 
765
        self.assertTrue(not "dnsHostName" in res[4])
 
766
        self.assertEquals(str(res[4]["lastLogon"]), "z")
 
767
 
 
768
        # Clean up
 
769
        dns = [self.samba4.dn("cn=%s" % n) for n in ["A","B","C","X","Y","Z"]]
 
770
        for dn in dns:
 
771
            self.ldb.delete(dn)
 
772
 
 
773
    def test_map_modify_local(self):
 
774
        """Modification of local records."""
 
775
        # Add local record
 
776
        dn = "cn=test,dc=idealx,dc=org"
 
777
        self.ldb.add({"dn": dn, 
 
778
                 "cn": "test",
 
779
                 "foo": "bar",
 
780
                 "revision": "1",
 
781
                 "description": "test"})
 
782
        # Check it's there
 
783
        attrs = ["foo", "revision", "description"]
 
784
        res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
 
785
        self.assertEquals(len(res), 1)
 
786
        self.assertEquals(str(res[0].dn), dn)
 
787
        self.assertEquals(str(res[0]["foo"]), "bar")
 
788
        self.assertEquals(str(res[0]["revision"]), "1")
 
789
        self.assertEquals(str(res[0]["description"]), "test")
 
790
        # Check it's not in the local db
 
791
        res = self.samba4.db.search(expression="(cn=test)", 
 
792
                                    scope=SCOPE_DEFAULT, attrs=attrs)
 
793
        self.assertEquals(len(res), 0)
 
794
        # Check it's not in the remote db
 
795
        res = self.samba3.db.search(expression="(cn=test)", 
 
796
                                    scope=SCOPE_DEFAULT, attrs=attrs)
 
797
        self.assertEquals(len(res), 0)
 
798
 
 
799
        # Modify local record
 
800
        ldif = """
 
801
dn: """ + dn + """
 
802
replace: foo
 
803
foo: baz
 
804
replace: description
 
805
description: foo
 
806
"""
 
807
        self.ldb.modify_ldif(ldif)
 
808
        # Check in local db
 
809
        res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
 
810
        self.assertEquals(len(res), 1)
 
811
        self.assertEquals(str(res[0].dn), dn)
 
812
        self.assertEquals(str(res[0]["foo"]), "baz")
 
813
        self.assertEquals(str(res[0]["revision"]), "1")
 
814
        self.assertEquals(str(res[0]["description"]), "foo")
 
815
 
 
816
        # Rename local record
 
817
        dn2 = "cn=toast,dc=idealx,dc=org"
 
818
        self.ldb.rename(dn, dn2)
 
819
        # Check in local db
 
820
        res = self.ldb.search(dn2, scope=SCOPE_BASE, attrs=attrs)
 
821
        self.assertEquals(len(res), 1)
 
822
        self.assertEquals(str(res[0].dn), dn2)
 
823
        self.assertEquals(str(res[0]["foo"]), "baz")
 
824
        self.assertEquals(str(res[0]["revision"]), "1")
 
825
        self.assertEquals(str(res[0]["description"]), "foo")
 
826
 
 
827
        # Delete local record
 
828
        self.ldb.delete(dn2)
 
829
        # Check it's gone
 
830
        res = self.ldb.search(dn2, scope=SCOPE_BASE)
 
831
        self.assertEquals(len(res), 0)
 
832
 
 
833
    def test_map_modify_remote_remote(self):
 
834
        """Modification of remote data of remote records"""
 
835
        # Add remote record
 
836
        dn = self.samba4.dn("cn=test")
 
837
        dn2 = self.samba3.dn("cn=test")
 
838
        self.samba3.db.add({"dn": dn2, 
 
839
                   "cn": "test",
 
840
                   "description": "foo",
 
841
                   "sambaBadPasswordCount": "3",
 
842
                   "sambaNextRid": "1001"})
 
843
        # Check it's there
 
844
        res = self.samba3.db.search(dn2, scope=SCOPE_BASE, 
 
845
                attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
 
846
        self.assertEquals(len(res), 1)
 
847
        self.assertEquals(str(res[0].dn), dn2)
 
848
        self.assertEquals(str(res[0]["description"]), "foo")
 
849
        self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "3")
 
850
        self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
 
851
        # Check in mapped db
 
852
        attrs = ["description", "badPwdCount", "nextRid"]
 
853
        res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs, expression="")
 
854
        self.assertEquals(len(res), 1)
 
855
        self.assertEquals(str(res[0].dn), dn)
 
856
        self.assertEquals(str(res[0]["description"]), "foo")
 
857
        self.assertEquals(str(res[0]["badPwdCount"]), "3")
 
858
        self.assertEquals(str(res[0]["nextRid"]), "1001")
 
859
        # Check in local db
 
860
        res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
 
861
        self.assertEquals(len(res), 0)
 
862
 
 
863
        # Modify remote data of remote record
 
864
        ldif = """
 
865
dn: """ + dn + """
 
866
replace: description
 
867
description: test
 
868
replace: badPwdCount
 
869
badPwdCount: 4
 
870
"""
 
871
        self.ldb.modify_ldif(ldif)
 
872
        # Check in mapped db
 
873
        res = self.ldb.search(dn, scope=SCOPE_BASE, 
 
874
                attrs=["description", "badPwdCount", "nextRid"])
 
875
        self.assertEquals(len(res), 1)
 
876
        self.assertEquals(str(res[0].dn), dn)
 
877
        self.assertEquals(str(res[0]["description"]), "test")
 
878
        self.assertEquals(str(res[0]["badPwdCount"]), "4")
 
879
        self.assertEquals(str(res[0]["nextRid"]), "1001")
 
880
        # Check in remote db
 
881
        res = self.samba3.db.search(dn2, scope=SCOPE_BASE, 
 
882
                attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
 
883
        self.assertEquals(len(res), 1)
 
884
        self.assertEquals(str(res[0].dn), dn2)
 
885
        self.assertEquals(str(res[0]["description"]), "test")
 
886
        self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
 
887
        self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
 
888
 
 
889
        # Rename remote record
 
890
        dn2 = self.samba4.dn("cn=toast")
 
891
        self.ldb.rename(dn, dn2)
 
892
        # Check in mapped db
 
893
        dn = dn2
 
894
        res = self.ldb.search(dn, scope=SCOPE_BASE, 
 
895
                attrs=["description", "badPwdCount", "nextRid"])
 
896
        self.assertEquals(len(res), 1)
 
897
        self.assertEquals(str(res[0].dn), dn)
 
898
        self.assertEquals(str(res[0]["description"]), "test")
 
899
        self.assertEquals(str(res[0]["badPwdCount"]), "4")
 
900
        self.assertEquals(str(res[0]["nextRid"]), "1001")
 
901
        # Check in remote db 
 
902
        dn2 = self.samba3.dn("cn=toast")
 
903
        res = self.samba3.db.search(dn2, scope=SCOPE_BASE, 
 
904
                attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
 
905
        self.assertEquals(len(res), 1)
 
906
        self.assertEquals(str(res[0].dn), dn2)
 
907
        self.assertEquals(str(res[0]["description"]), "test")
 
908
        self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
 
909
        self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
 
910
 
 
911
        # Delete remote record
 
912
        self.ldb.delete(dn)
 
913
        # Check in mapped db that it's removed
 
914
        res = self.ldb.search(dn, scope=SCOPE_BASE)
 
915
        self.assertEquals(len(res), 0)
 
916
        # Check in remote db
 
917
        res = self.samba3.db.search(dn2, scope=SCOPE_BASE)
 
918
        self.assertEquals(len(res), 0)
 
919
 
 
920
    def test_map_modify_remote_local(self):
 
921
        """Modification of local data of remote records"""
 
922
        # Add remote record (same as before)
 
923
        dn = self.samba4.dn("cn=test")
 
924
        dn2 = self.samba3.dn("cn=test")
 
925
        self.samba3.db.add({"dn": dn2, 
 
926
                   "cn": "test",
 
927
                   "description": "foo",
 
928
                   "sambaBadPasswordCount": "3",
 
929
                   "sambaNextRid": "1001"})
 
930
 
 
931
        # Modify local data of remote record
 
932
        ldif = """
 
933
dn: """ + dn + """
 
934
add: revision
 
935
revision: 1
 
936
replace: description
 
937
description: test
 
938
 
 
939
"""
 
940
        self.ldb.modify_ldif(ldif)
 
941
        # Check in mapped db
 
942
        attrs = ["revision", "description"]
 
943
        res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
 
944
        self.assertEquals(len(res), 1)
 
945
        self.assertEquals(str(res[0].dn), dn)
 
946
        self.assertEquals(str(res[0]["description"]), "test")
 
947
        self.assertEquals(str(res[0]["revision"]), "1")
 
948
        # Check in remote db
 
949
        res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
 
950
        self.assertEquals(len(res), 1)
 
951
        self.assertEquals(str(res[0].dn), dn2)
 
952
        self.assertEquals(str(res[0]["description"]), "test")
 
953
        self.assertTrue(not "revision" in res[0])
 
954
        # Check in local db
 
955
        res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
 
956
        self.assertEquals(len(res), 1)
 
957
        self.assertEquals(str(res[0].dn), dn)
 
958
        self.assertTrue(not "description" in res[0])
 
959
        self.assertEquals(str(res[0]["revision"]), "1")
 
960
 
 
961
        # Delete (newly) split record
 
962
        self.ldb.delete(dn)
 
963
 
 
964
    def test_map_modify_split(self):
 
965
        """Testing modification of split records"""
 
966
        # Add split record
 
967
        dn = self.samba4.dn("cn=test")
 
968
        dn2 = self.samba3.dn("cn=test")
 
969
        self.ldb.add({
 
970
            "dn": dn,
 
971
            "cn": "test",
 
972
            "description": "foo",
 
973
            "badPwdCount": "3",
 
974
            "nextRid": "1001",
 
975
            "revision": "1"})
 
976
        # Check it's there
 
977
        attrs = ["description", "badPwdCount", "nextRid", "revision"]
 
978
        res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
 
979
        self.assertEquals(len(res), 1)
 
980
        self.assertEquals(str(res[0].dn), dn)
 
981
        self.assertEquals(str(res[0]["description"]), "foo")
 
982
        self.assertEquals(str(res[0]["badPwdCount"]), "3")
 
983
        self.assertEquals(str(res[0]["nextRid"]), "1001")
 
984
        self.assertEquals(str(res[0]["revision"]), "1")
 
985
        # Check in local db
 
986
        res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
 
987
        self.assertEquals(len(res), 1)
 
988
        self.assertEquals(str(res[0].dn), dn)
 
989
        self.assertTrue(not "description" in res[0])
 
990
        self.assertTrue(not "badPwdCount" in res[0])
 
991
        self.assertTrue(not "nextRid" in res[0])
 
992
        self.assertEquals(str(res[0]["revision"]), "1")
 
993
        # Check in remote db
 
994
        attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", 
 
995
                 "revision"]
 
996
        res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
 
997
        self.assertEquals(len(res), 1)
 
998
        self.assertEquals(str(res[0].dn), dn2)
 
999
        self.assertEquals(str(res[0]["description"]), "foo")
 
1000
        self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "3")
 
1001
        self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
 
1002
        self.assertTrue(not "revision" in res[0])
 
1003
 
 
1004
        # Modify of split record
 
1005
        ldif = """
 
1006
dn: """ + dn + """
 
1007
replace: description
 
1008
description: test
 
1009
replace: badPwdCount
 
1010
badPwdCount: 4
 
1011
replace: revision
 
1012
revision: 2
 
1013
"""
 
1014
        self.ldb.modify_ldif(ldif)
 
1015
        # Check in mapped db
 
1016
        attrs = ["description", "badPwdCount", "nextRid", "revision"]
 
1017
        res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
 
1018
        self.assertEquals(len(res), 1)
 
1019
        self.assertEquals(str(res[0].dn), dn)
 
1020
        self.assertEquals(str(res[0]["description"]), "test")
 
1021
        self.assertEquals(str(res[0]["badPwdCount"]), "4")
 
1022
        self.assertEquals(str(res[0]["nextRid"]), "1001")
 
1023
        self.assertEquals(str(res[0]["revision"]), "2")
 
1024
        # Check in local db
 
1025
        res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
 
1026
        self.assertEquals(len(res), 1)
 
1027
        self.assertEquals(str(res[0].dn), dn)
 
1028
        self.assertTrue(not "description" in res[0])
 
1029
        self.assertTrue(not "badPwdCount" in res[0])
 
1030
        self.assertTrue(not "nextRid" in res[0])
 
1031
        self.assertEquals(str(res[0]["revision"]), "2")
 
1032
        # Check in remote db
 
1033
        attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", 
 
1034
                 "revision"]
 
1035
        res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
 
1036
        self.assertEquals(len(res), 1)
 
1037
        self.assertEquals(str(res[0].dn), dn2)
 
1038
        self.assertEquals(str(res[0]["description"]), "test")
 
1039
        self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
 
1040
        self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
 
1041
        self.assertTrue(not "revision" in res[0])
 
1042
 
 
1043
        # Rename split record
 
1044
        dn2 = self.samba4.dn("cn=toast")
 
1045
        self.ldb.rename(dn, dn2)
 
1046
        # Check in mapped db
 
1047
        dn = dn2
 
1048
        attrs = ["description", "badPwdCount", "nextRid", "revision"]
 
1049
        res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
 
1050
        self.assertEquals(len(res), 1)
 
1051
        self.assertEquals(str(res[0].dn), dn)
 
1052
        self.assertEquals(str(res[0]["description"]), "test")
 
1053
        self.assertEquals(str(res[0]["badPwdCount"]), "4")
 
1054
        self.assertEquals(str(res[0]["nextRid"]), "1001")
 
1055
        self.assertEquals(str(res[0]["revision"]), "2")
 
1056
        # Check in local db
 
1057
        res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
 
1058
        self.assertEquals(len(res), 1)
 
1059
        self.assertEquals(str(res[0].dn), dn)
 
1060
        self.assertTrue(not "description" in res[0])
 
1061
        self.assertTrue(not "badPwdCount" in res[0])
 
1062
        self.assertTrue(not "nextRid" in res[0])
 
1063
        self.assertEquals(str(res[0]["revision"]), "2")
 
1064
        # Check in remote db
 
1065
        dn2 = self.samba3.dn("cn=toast")
 
1066
        res = self.samba3.db.search(dn2, scope=SCOPE_BASE, 
 
1067
          attrs=["description", "sambaBadPasswordCount", "sambaNextRid", 
 
1068
                 "revision"])
 
1069
        self.assertEquals(len(res), 1)
 
1070
        self.assertEquals(str(res[0].dn), dn2)
 
1071
        self.assertEquals(str(res[0]["description"]), "test")
 
1072
        self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
 
1073
        self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
 
1074
        self.assertTrue(not "revision" in res[0])
 
1075
 
 
1076
        # Delete split record
 
1077
        self.ldb.delete(dn)
 
1078
        # Check in mapped db
 
1079
        res = self.ldb.search(dn, scope=SCOPE_BASE)
 
1080
        self.assertEquals(len(res), 0)
 
1081
        # Check in local db
 
1082
        res = self.samba4.db.search(dn, scope=SCOPE_BASE)
 
1083
        self.assertEquals(len(res), 0)
 
1084
        # Check in remote db
 
1085
        res = self.samba3.db.search(dn2, scope=SCOPE_BASE)
 
1086
        self.assertEquals(len(res), 0)