~ubuntu-branches/ubuntu/wily/nss-pam-ldapd/wily-proposed

« back to all changes in this revision

Viewing changes to tests/test_pynslcd_cache.py

  • Committer: Package Import Robot
  • Author(s): Arthur de Jong
  • Date: 2014-06-08 14:00:00 UTC
  • mfrom: (16.1.8) (14.1.12 experimental)
  • Revision ID: package-import@ubuntu.com-20140608140000-rt6fspljmk9252zd
Tags: 0.9.4-1
* upload to unstable
* new upstream release:
  - also handle password policy information on BIND failure (this makes it
    possible to distinguish between a wrong password and an expired
    password)
  - fix mapping the member attribute to an empty string
  - any buffers that may have held passwords are cleared before the memory
    is released
  - increase buffer size for passwords to support extremely long passwords
    (thanks ushi)
  - increase buffer size for DN to support very long names or names with
    non-ASCII characters
  - log an error in almost all places where a defined buffer is not large
    enough to hold the provided data instead of just (sometimes silently)
    failing
  - logging improvements (start-up problems, login failures)
* add signature checking option to watch file
* add a debian/upstream/metadata file

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#!/usr/bin/env python
 
2
 
 
3
# test_pynslcd_cache.py - tests for the pynslcd caching functionality
 
4
#
 
5
# Copyright (C) 2013 Arthur de Jong
 
6
#
 
7
# This library is free software; you can redistribute it and/or
 
8
# modify it under the terms of the GNU Lesser General Public
 
9
# License as published by the Free Software Foundation; either
 
10
# version 2.1 of the License, or (at your option) any later version.
 
11
#
 
12
# This library is distributed in the hope that it will be useful,
 
13
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
14
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 
15
# Lesser General Public License for more details.
 
16
#
 
17
# You should have received a copy of the GNU Lesser General Public
 
18
# License along with this library; if not, write to the Free Software
 
19
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
 
20
# 02110-1301 USA
 
21
 
 
22
import os
 
23
import os.path
 
24
import sys
 
25
import unittest
 
26
 
 
27
# fix the Python path
 
28
sys.path.insert(1, os.path.abspath(os.path.join(sys.path[0], '..', 'pynslcd')))
 
29
sys.path.insert(2, os.path.abspath(os.path.join('..', 'pynslcd')))
 
30
 
 
31
 
 
32
# TODO: think about case-sesitivity of cache searches (have tests for that)
 
33
 
 
34
 
 
35
class TestAlias(unittest.TestCase):
 
36
 
 
37
    def setUp(self):
 
38
        import alias
 
39
        cache = alias.Cache()
 
40
        cache.store('alias1', ['member1', 'member2'])
 
41
        cache.store('alias2', ['member1', 'member3'])
 
42
        cache.store('alias3', [])
 
43
        self.cache = cache
 
44
 
 
45
    def test_by_name(self):
 
46
        self.assertItemsEqual(self.cache.retrieve(dict(cn='alias1')), [
 
47
            ['alias1', ['member1', 'member2']],
 
48
        ])
 
49
 
 
50
    def test_by_member(self):
 
51
        self.assertItemsEqual(self.cache.retrieve(dict(rfc822MailMember='member1')), [
 
52
            ['alias1', ['member1', 'member2']],
 
53
            ['alias2', ['member1', 'member3']],
 
54
        ])
 
55
 
 
56
    def test_all(self):
 
57
        self.assertItemsEqual(self.cache.retrieve({}), [
 
58
            ['alias1', ['member1', 'member2']],
 
59
            ['alias2', ['member1', 'member3']],
 
60
            ['alias3', []],
 
61
        ])
 
62
 
 
63
 
 
64
class TestEther(unittest.TestCase):
 
65
 
 
66
    def setUp(self):
 
67
        import ether
 
68
        cache = ether.Cache()
 
69
        cache.store('name1', '0:18:8a:54:1a:11')
 
70
        cache.store('name2', '0:18:8a:54:1a:22')
 
71
        self.cache = cache
 
72
 
 
73
    def test_by_name(self):
 
74
        self.assertItemsEqual(self.cache.retrieve(dict(cn='name1')), [
 
75
            ['name1', '0:18:8a:54:1a:11'],
 
76
        ])
 
77
        self.assertItemsEqual(self.cache.retrieve(dict(cn='name2')), [
 
78
            ['name2', '0:18:8a:54:1a:22'],
 
79
        ])
 
80
 
 
81
    def test_by_ether(self):
 
82
        # ideally we should also support alternate representations
 
83
        self.assertItemsEqual(self.cache.retrieve(dict(macAddress='0:18:8a:54:1a:22')), [
 
84
            ['name2', '0:18:8a:54:1a:22'],
 
85
        ])
 
86
 
 
87
    def test_all(self):
 
88
        self.assertItemsEqual(self.cache.retrieve({}), [
 
89
            ['name1', '0:18:8a:54:1a:11'],
 
90
            ['name2', '0:18:8a:54:1a:22'],
 
91
        ])
 
92
 
 
93
 
 
94
class TestGroup(unittest.TestCase):
 
95
 
 
96
    def setUp(self):
 
97
        import group
 
98
        cache = group.Cache()
 
99
        cache.store('group1', 'pass1', 10, ['user1', 'user2'])
 
100
        cache.store('group2', 'pass2', 20, ['user1', 'user2', 'user3'])
 
101
        cache.store('group3', 'pass3', 30, [])
 
102
        cache.store('group4', 'pass4', 40, ['user2', ])
 
103
        self.cache = cache
 
104
 
 
105
    def test_by_name(self):
 
106
        self.assertItemsEqual(self.cache.retrieve(dict(cn='group1')), [
 
107
            ['group1', 'pass1', 10, ['user1', 'user2']],
 
108
        ])
 
109
        self.assertItemsEqual(self.cache.retrieve(dict(cn='group3')), [
 
110
            ['group3', 'pass3', 30, []],
 
111
        ])
 
112
 
 
113
    def test_by_gid(self):
 
114
        self.assertItemsEqual(self.cache.retrieve(dict(gidNumber=10)), [
 
115
            ['group1', 'pass1', 10, ['user1', 'user2']],
 
116
        ])
 
117
        self.assertItemsEqual(self.cache.retrieve(dict(gidNumber=40)), [
 
118
            ['group4', 'pass4', 40, ['user2']],
 
119
        ])
 
120
 
 
121
    def test_all(self):
 
122
        self.assertItemsEqual(self.cache.retrieve({}), [
 
123
            ['group1', 'pass1', 10, ['user1', 'user2']],
 
124
            ['group2', 'pass2', 20, ['user1', 'user2', 'user3']],
 
125
            ['group3', 'pass3', 30, []],
 
126
            ['group4', 'pass4', 40, ['user2']],
 
127
        ])
 
128
 
 
129
    def test_bymember(self):
 
130
        self.assertItemsEqual(self.cache.retrieve(dict(memberUid='user1')), [
 
131
            ['group1', 'pass1', 10, ['user1', 'user2']],
 
132
            ['group2', 'pass2', 20, ['user1', 'user2', 'user3']],
 
133
        ])
 
134
        self.assertItemsEqual(self.cache.retrieve(dict(memberUid='user2')), [
 
135
            ['group1', 'pass1', 10, ['user1', 'user2']],
 
136
            ['group2', 'pass2', 20, ['user1', 'user2', 'user3']],
 
137
            ['group4', 'pass4', 40, ['user2']],
 
138
        ])
 
139
        self.assertItemsEqual(self.cache.retrieve(dict(memberUid='user3')), [
 
140
            ['group2', 'pass2', 20, ['user1', 'user2', 'user3']],
 
141
        ])
 
142
 
 
143
 
 
144
class TestHost(unittest.TestCase):
 
145
 
 
146
    def setUp(self):
 
147
        import host
 
148
        cache = host.Cache()
 
149
        cache.store('hostname1', [], ['127.0.0.1', ])
 
150
        cache.store('hostname2', ['alias1', 'alias2'], ['127.0.0.2', '127.0.0.3'])
 
151
        self.cache = cache
 
152
 
 
153
    def test_by_name(self):
 
154
        self.assertItemsEqual(self.cache.retrieve(dict(cn='hostname1')), [
 
155
            ['hostname1', [], ['127.0.0.1']],
 
156
        ])
 
157
        self.assertItemsEqual(self.cache.retrieve(dict(cn='hostname2')), [
 
158
            ['hostname2', ['alias1', 'alias2'], ['127.0.0.2', '127.0.0.3']],
 
159
        ])
 
160
 
 
161
    def test_by_alias(self):
 
162
        self.assertItemsEqual(self.cache.retrieve(dict(cn='alias1')), [
 
163
            ['hostname2', ['alias1', 'alias2'], ['127.0.0.2', '127.0.0.3']],
 
164
        ])
 
165
        self.assertItemsEqual(self.cache.retrieve(dict(cn='alias2')), [
 
166
            ['hostname2', ['alias1', 'alias2'], ['127.0.0.2', '127.0.0.3']],
 
167
        ])
 
168
 
 
169
    def test_by_address(self):
 
170
        self.assertItemsEqual(self.cache.retrieve(dict(ipHostNumber='127.0.0.3')), [
 
171
            ['hostname2', ['alias1', 'alias2'], ['127.0.0.2', '127.0.0.3']],
 
172
        ])
 
173
 
 
174
 
 
175
class TestNetgroup(unittest.TestCase):
 
176
 
 
177
    def setUp(self):
 
178
        import netgroup
 
179
        cache = netgroup.Cache()
 
180
        cache.store('netgroup1', ['(host1, user1,)', '(host1, user2,)', '(host2, user1,)'], ['netgroup2', ])
 
181
        cache.store('netgroup2', ['(host3, user1,)', '(host3, user3,)'], [])
 
182
        self.cache = cache
 
183
 
 
184
    def test_by_name(self):
 
185
        self.assertItemsEqual(self.cache.retrieve(dict(cn='netgroup1')), [
 
186
            ['netgroup1', ['(host1, user1,)', '(host1, user2,)', '(host2, user1,)'], ['netgroup2', ]],
 
187
        ])
 
188
        self.assertItemsEqual(self.cache.retrieve(dict(cn='netgroup2')), [
 
189
            ['netgroup2', ['(host3, user1,)', '(host3, user3,)'], []],
 
190
        ])
 
191
 
 
192
 
 
193
class TestNetwork(unittest.TestCase):
 
194
 
 
195
    def setUp(self):
 
196
        import network
 
197
        cache = network.Cache()
 
198
        cache.store('networkname1', [], ['127.0.0.1', ])
 
199
        cache.store('networkname2', ['alias1', 'alias2'], ['127.0.0.2', '127.0.0.3'])
 
200
        self.cache = cache
 
201
 
 
202
    def test_by_name(self):
 
203
        self.assertItemsEqual(self.cache.retrieve(dict(cn='networkname1')), [
 
204
            ['networkname1', [], ['127.0.0.1']],
 
205
        ])
 
206
        self.assertItemsEqual(self.cache.retrieve(dict(cn='networkname2')), [
 
207
            ['networkname2', ['alias1', 'alias2'], ['127.0.0.2', '127.0.0.3']],
 
208
        ])
 
209
 
 
210
    def test_by_alias(self):
 
211
        self.assertItemsEqual(self.cache.retrieve(dict(cn='alias1')), [
 
212
            ['networkname2', ['alias1', 'alias2'], ['127.0.0.2', '127.0.0.3']],
 
213
        ])
 
214
        self.assertItemsEqual(self.cache.retrieve(dict(cn='alias2')), [
 
215
            ['networkname2', ['alias1', 'alias2'], ['127.0.0.2', '127.0.0.3']],
 
216
        ])
 
217
 
 
218
    def test_by_address(self):
 
219
        self.assertItemsEqual(self.cache.retrieve(dict(ipNetworkNumber='127.0.0.3')), [
 
220
            ['networkname2', ['alias1', 'alias2'], ['127.0.0.2', '127.0.0.3']],
 
221
        ])
 
222
 
 
223
 
 
224
class TestPasswd(unittest.TestCase):
 
225
 
 
226
    def setUp(self):
 
227
        import passwd
 
228
        cache = passwd.Cache()
 
229
        cache.store('name', 'passwd', 100, 200, 'gecos', '/home/user', '/bin/bash')
 
230
        cache.store('name2', 'passwd2', 101, 202, 'gecos2', '/home/user2', '/bin/bash')
 
231
        self.cache = cache
 
232
 
 
233
    def test_by_name(self):
 
234
        self.assertItemsEqual(self.cache.retrieve(dict(uid='name')), [
 
235
            [u'name', u'passwd', 100, 200, u'gecos', u'/home/user', u'/bin/bash'],
 
236
        ])
 
237
 
 
238
    def test_by_unknown_name(self):
 
239
        self.assertItemsEqual(self.cache.retrieve(dict(uid='notfound')), [])
 
240
 
 
241
    def test_by_number(self):
 
242
        self.assertItemsEqual(self.cache.retrieve(dict(uidNumber=100)), [
 
243
            [u'name', u'passwd', 100, 200, u'gecos', u'/home/user', u'/bin/bash'],
 
244
        ])
 
245
        self.assertItemsEqual(self.cache.retrieve(dict(uidNumber=101)), [
 
246
            ['name2', 'passwd2', 101, 202, 'gecos2', '/home/user2', '/bin/bash'],
 
247
        ])
 
248
 
 
249
    def test_all(self):
 
250
        self.assertItemsEqual(self.cache.retrieve({}), [
 
251
            [u'name', u'passwd', 100, 200, u'gecos', u'/home/user', u'/bin/bash'],
 
252
            [u'name2', u'passwd2', 101, 202, u'gecos2', u'/home/user2', u'/bin/bash'],
 
253
        ])
 
254
 
 
255
 
 
256
class TestProtocol(unittest.TestCase):
 
257
 
 
258
    def setUp(self):
 
259
        import protocol
 
260
        cache = protocol.Cache()
 
261
        cache.store('protocol1', ['alias1', 'alias2'], 100)
 
262
        cache.store('protocol2', ['alias3', ], 200)
 
263
        cache.store('protocol3', [], 300)
 
264
        self.cache = cache
 
265
 
 
266
    def test_by_name(self):
 
267
        self.assertItemsEqual(self.cache.retrieve(dict(cn='protocol1')), [
 
268
            ['protocol1', ['alias1', 'alias2'], 100],
 
269
        ])
 
270
        self.assertItemsEqual(self.cache.retrieve(dict(cn='protocol2')), [
 
271
            ['protocol2', ['alias3', ], 200],
 
272
        ])
 
273
        self.assertItemsEqual(self.cache.retrieve(dict(cn='protocol3')), [
 
274
            ['protocol3', [], 300],
 
275
        ])
 
276
 
 
277
    def test_by_unknown_name(self):
 
278
        self.assertItemsEqual(self.cache.retrieve(dict(cn='notfound')), [])
 
279
 
 
280
    def test_by_number(self):
 
281
        self.assertItemsEqual(self.cache.retrieve(dict(ipProtocolNumber=100)), [
 
282
            ['protocol1', ['alias1', 'alias2'], 100],
 
283
        ])
 
284
        self.assertItemsEqual(self.cache.retrieve(dict(ipProtocolNumber=200)), [
 
285
            ['protocol2', ['alias3', ], 200],
 
286
        ])
 
287
 
 
288
    def test_by_alias(self):
 
289
        self.assertItemsEqual(self.cache.retrieve(dict(cn='alias1')), [
 
290
            ['protocol1', ['alias1', 'alias2'], 100],
 
291
        ])
 
292
        self.assertItemsEqual(self.cache.retrieve(dict(cn='alias3')), [
 
293
            ['protocol2', ['alias3', ], 200],
 
294
        ])
 
295
 
 
296
    def test_all(self):
 
297
        self.assertItemsEqual(self.cache.retrieve({}), [
 
298
            ['protocol1', ['alias1', 'alias2'], 100],
 
299
            ['protocol2', ['alias3'], 200],
 
300
            ['protocol3', [], 300],
 
301
        ])
 
302
 
 
303
 
 
304
class TestRpc(unittest.TestCase):
 
305
 
 
306
    def setUp(self):
 
307
        import rpc
 
308
        cache = rpc.Cache()
 
309
        cache.store('rpc1', ['alias1', 'alias2'], 100)
 
310
        cache.store('rpc2', ['alias3', ], 200)
 
311
        cache.store('rpc3', [], 300)
 
312
        self.cache = cache
 
313
 
 
314
    def test_by_name(self):
 
315
        self.assertItemsEqual(self.cache.retrieve(dict(cn='rpc1')), [
 
316
            ['rpc1', ['alias1', 'alias2'], 100],
 
317
        ])
 
318
        self.assertItemsEqual(self.cache.retrieve(dict(cn='rpc2')), [
 
319
            ['rpc2', ['alias3', ], 200],
 
320
        ])
 
321
        self.assertItemsEqual(self.cache.retrieve(dict(cn='rpc3')), [
 
322
            ['rpc3', [], 300],
 
323
        ])
 
324
 
 
325
    def test_by_unknown_name(self):
 
326
        self.assertItemsEqual(self.cache.retrieve(dict(cn='notfound')), [])
 
327
 
 
328
    def test_by_number(self):
 
329
        self.assertItemsEqual(self.cache.retrieve(dict(oncRpcNumber=100)), [
 
330
            ['rpc1', ['alias1', 'alias2'], 100],
 
331
        ])
 
332
        self.assertItemsEqual(self.cache.retrieve(dict(oncRpcNumber=200)), [
 
333
            ['rpc2', ['alias3', ], 200],
 
334
        ])
 
335
 
 
336
    def test_by_alias(self):
 
337
        self.assertItemsEqual(self.cache.retrieve(dict(cn='alias1')), [
 
338
            ['rpc1', ['alias1', 'alias2'], 100],
 
339
        ])
 
340
        self.assertItemsEqual(self.cache.retrieve(dict(cn='alias3')), [
 
341
            ['rpc2', ['alias3', ], 200],
 
342
        ])
 
343
 
 
344
    def test_all(self):
 
345
        self.assertItemsEqual(self.cache.retrieve({}), [
 
346
            ['rpc1', ['alias1', 'alias2'], 100],
 
347
            ['rpc2', ['alias3'], 200],
 
348
            ['rpc3', [], 300],
 
349
        ])
 
350
 
 
351
 
 
352
class TestService(unittest.TestCase):
 
353
 
 
354
    def setUp(self):
 
355
        import service
 
356
        cache = service.Cache()
 
357
        cache.store('service1', ['alias1', 'alias2'], 100, 'tcp')
 
358
        cache.store('service1', ['alias1', 'alias2'], 100, 'udp')
 
359
        cache.store('service2', ['alias3', ], 200, 'udp')
 
360
        cache.store('service3', [], 300, 'udp')
 
361
        self.cache = cache
 
362
 
 
363
    def test_by_name(self):
 
364
        self.assertItemsEqual(self.cache.retrieve(dict(cn='service1')), [
 
365
            ['service1', ['alias1', 'alias2'], 100, 'tcp'],
 
366
            ['service1', ['alias1', 'alias2'], 100, 'udp'],
 
367
        ])
 
368
        self.assertItemsEqual(self.cache.retrieve(dict(cn='service2')), [
 
369
            ['service2', ['alias3', ], 200, 'udp'],
 
370
        ])
 
371
        self.assertItemsEqual(self.cache.retrieve(dict(cn='service3')), [
 
372
            ['service3', [], 300, 'udp'],
 
373
        ])
 
374
 
 
375
    def test_by_name_and_protocol(self):
 
376
        self.assertItemsEqual(self.cache.retrieve(dict(cn='service1', ipServiceProtocol='udp')), [
 
377
            ['service1', ['alias1', 'alias2'], 100, 'udp'],
 
378
        ])
 
379
        self.assertItemsEqual(self.cache.retrieve(dict(cn='service1', ipServiceProtocol='tcp')), [
 
380
            ['service1', ['alias1', 'alias2'], 100, 'tcp'],
 
381
        ])
 
382
        self.assertItemsEqual(self.cache.retrieve(dict(cn='service2', ipServiceProtocol='udp')), [
 
383
            ['service2', ['alias3', ], 200, 'udp'],
 
384
        ])
 
385
        self.assertItemsEqual(self.cache.retrieve(dict(cn='service2', ipServiceProtocol='tcp')), [
 
386
        ])
 
387
 
 
388
    def test_by_unknown_name(self):
 
389
        self.assertItemsEqual(self.cache.retrieve(dict(cn='notfound')), [])
 
390
 
 
391
    def test_by_number(self):
 
392
        self.assertItemsEqual(self.cache.retrieve(dict(ipServicePort=100)), [
 
393
            ['service1', ['alias1', 'alias2'], 100, 'tcp'],
 
394
            ['service1', ['alias1', 'alias2'], 100, 'udp'],
 
395
        ])
 
396
        self.assertItemsEqual(self.cache.retrieve(dict(ipServicePort=200)), [
 
397
            ['service2', ['alias3', ], 200, 'udp'],
 
398
        ])
 
399
 
 
400
    def test_by_number_and_protocol(self):
 
401
        self.assertItemsEqual(self.cache.retrieve(dict(ipServicePort=100, ipServiceProtocol='udp')), [
 
402
            ['service1', ['alias1', 'alias2'], 100, 'udp'],
 
403
        ])
 
404
        self.assertItemsEqual(self.cache.retrieve(dict(ipServicePort=100, ipServiceProtocol='tcp')), [
 
405
            ['service1', ['alias1', 'alias2'], 100, 'tcp'],
 
406
        ])
 
407
        self.assertItemsEqual(self.cache.retrieve(dict(ipServicePort=200, ipServiceProtocol='udp')), [
 
408
            ['service2', ['alias3', ], 200, 'udp'],
 
409
        ])
 
410
        self.assertItemsEqual(self.cache.retrieve(dict(ipServicePort=200, ipServiceProtocol='tcp')), [
 
411
        ])
 
412
 
 
413
    def test_by_alias(self):
 
414
        self.assertItemsEqual(self.cache.retrieve(dict(cn='alias1')), [
 
415
            ['service1', ['alias1', 'alias2'], 100, 'udp'],
 
416
            ['service1', ['alias1', 'alias2'], 100, 'tcp'],
 
417
        ])
 
418
        self.assertItemsEqual(self.cache.retrieve(dict(cn='alias3')), [
 
419
            ['service2', ['alias3', ], 200, 'udp'],
 
420
        ])
 
421
 
 
422
    def test_all(self):
 
423
        self.assertItemsEqual(self.cache.retrieve({}), [
 
424
            ['service1', ['alias1', 'alias2'], 100, 'tcp'],
 
425
            ['service1', ['alias1', 'alias2'], 100, 'udp'],
 
426
            ['service2', ['alias3', ], 200, 'udp'],
 
427
            ['service3', [], 300, 'udp'],
 
428
        ])
 
429
 
 
430
 
 
431
class Testshadow(unittest.TestCase):
 
432
 
 
433
    def setUp(self):
 
434
        import shadow
 
435
        cache = shadow.Cache()
 
436
        cache.store('name', 'passwd', 15639, 0, 7, -1, -1, -1, 0)
 
437
        cache.store('name2', 'passwd2', 15639, 0, 7, -1, -1, -1, 0)
 
438
        self.cache = cache
 
439
 
 
440
    def test_by_name(self):
 
441
        self.assertItemsEqual(self.cache.retrieve(dict(uid='name')), [
 
442
            [u'name', u'passwd', 15639, 0, 7, -1, -1, -1, 0],
 
443
        ])
 
444
        self.assertItemsEqual(self.cache.retrieve(dict(uid='name2')), [
 
445
            [u'name2', u'passwd2', 15639, 0, 7, -1, -1, -1, 0],
 
446
        ])
 
447
 
 
448
    def test_by_unknown_name(self):
 
449
        self.assertItemsEqual(self.cache.retrieve(dict(uid='notfound')), [])
 
450
 
 
451
    def test_all(self):
 
452
        self.assertItemsEqual(self.cache.retrieve({}), [
 
453
            [u'name', u'passwd', 15639, 0, 7, -1, -1, -1, 0],
 
454
            [u'name2', u'passwd2', 15639, 0, 7, -1, -1, -1, 0],
 
455
        ])
 
456
 
 
457
 
 
458
if __name__ == '__main__':
 
459
    unittest.main()