4
from landscape.user.provider import (UserProvider, UserNotFoundError,
7
from landscape.user.tests.helpers import FakeUserProvider
9
from landscape.tests.helpers import (
10
LandscapeTest, MakePathHelper)
13
class ProviderTest(LandscapeTest):
15
helpers = [MakePathHelper]
18
LandscapeTest.setUp(self)
19
self.shadow_file = self.make_path("""\
20
jdoe:$1$xFlQvTqe$cBtrNEDOIKMy/BuJoUdeG0:13348:0:99999:7:::
21
psmith:!:13348:0:99999:7:::
22
sbarnes:$1$q7sz09uw$q.A3526M/SHu8vUb.Jo1A/:13349:0:99999:7:::
25
self.passwd_file = self.make_path("""\
26
root:x:0:0:root:/root:/bin/bash
27
haldaemon:x:107:116:Hardware abstraction layer,,,:/home/haldaemon:/bin/false
28
kevin:x:1001:65534:Kevin,101,+44123123,+44123124:/home/kevin:/bin/bash
31
self.group_file = self.make_path("""\
33
cdrom:x:24:haldaemon,kevin
37
def test_get_uid(self):
39
Given a username L{UserProvider.get_uid} returns the UID or
40
raises a L{UserProviderError} if a match isn't found.
42
data = [("jdoe", "x", 1000, 1000, "JD,,,,", "/home/jdoe", "/bin/zsh")]
43
provider = FakeUserProvider(users=data, shadow_file=self.shadow_file)
44
self.assertEquals(provider.get_uid("jdoe"), 1000)
45
self.assertRaises(UserNotFoundError, provider.get_uid, "john")
47
def test_get_users(self):
48
"""Get users should return data for all users found on the system."""
49
data = [("jdoe", "x", 1000, 1000, "JD,,,,", "/home/jdoe", "/bin/zsh")]
50
provider = FakeUserProvider(users=data, shadow_file=self.shadow_file)
51
users = provider.get_users()
52
self.assertEquals(users, [{"username": "jdoe", "name": u"JD",
53
"uid": 1000, "enabled": True,
54
"location": None, "home-phone": None,
56
"primary-gid": 1000}])
58
def test_gecos_data(self):
60
Location, home phone number, and work phone number should be
61
correctly parsed out of the GECOS field, and included in the
64
data = [("jdoe", "x", 1000, 1000, "JD,Everywhere,7654321,123HOME,",
65
"/home/jdoe", "/bin/zsh")]
66
provider = FakeUserProvider(users=data, shadow_file=self.shadow_file)
67
users = provider.get_users()
68
self.assertEquals(users, [{"username": "jdoe", "name": u"JD",
69
"uid": 1000, "enabled": True,
70
"location": u"Everywhere",
71
"home-phone": u"123HOME",
72
"work-phone": u"7654321",
73
"primary-gid": 1000}])
75
def test_four_gecos_fields(self):
76
"""If a GECOS field only has four fields it should still work."""
77
data = [("jdoe", "x", 1000, 1000, "JD,Everywhere,7654321,123HOME",
78
"/home/jdoe", "/bin/zsh")]
79
provider = FakeUserProvider(users=data, shadow_file=self.shadow_file)
80
users = provider.get_users()
81
self.assertEquals(users, [{"username": "jdoe", "name": u"JD",
82
"uid": 1000, "enabled": True,
83
"location": u"Everywhere",
84
"home-phone": u"123HOME",
85
"work-phone": u"7654321",
86
"primary-gid": 1000}])
88
def test_old_school_gecos_data(self):
90
If C{useradd} is used to add users to a system the GECOS field
91
will be written as a comment. The client must be resilient to
94
data = [("jdoe", "x", 1000, 1000, "John Doe", "/home/jdoe", "/bin/zsh")]
95
provider = FakeUserProvider(users=data, shadow_file=self.shadow_file)
96
users = provider.get_users()
97
self.assertEquals(users, [{"username": "jdoe", "uid": 1000,
98
"enabled": True, "name": u"John Doe",
99
"location": None, "home-phone": None,
100
"work-phone": None, "primary-gid": 1000}])
102
def test_weird_gecos_data(self):
104
If GECOS data is malformed in a way that contains less than
105
four fields, read as many as are available.
107
data = [("jdoe", "x", 1000, 1000, "John Doe,Everywhere",
108
"/home/jdoe", "/bin/zsh")]
109
provider = FakeUserProvider(users=data, shadow_file=self.shadow_file)
110
users = provider.get_users()
111
self.assertEquals(users, [{"username": "jdoe", "uid": 1000,
112
"enabled": True, "name": "John Doe",
113
"location": "Everywhere",
114
"home-phone": None, "work-phone": None,
115
"primary-gid": 1000}])
117
def test_no_gecos_data(self):
119
When no data is provided in the GECOS field we should report
120
all optional fields as C{None}.
122
data = [("jdoe", "x", 1000, 1000, "", "/home/jdoe", "/bin/zsh")]
123
provider = FakeUserProvider(users=data, shadow_file=self.shadow_file)
124
users = provider.get_users()
125
self.assertEquals(users, [{"username": "jdoe", "uid": 1000,
126
"enabled": True, "name": None,
127
"location": None, "home-phone": None,
129
"primary-gid": 1000}])
131
def test_utf8_gecos_data(self):
132
"""Gecos fields should be decoded from utf-8 to unicode."""
133
name = u"Jos\N{LATIN SMALL LETTER E WITH ACUTE}"
134
location = "F\N{LATIN SMALL LETTER I WITH DIAERESIS}nland"
135
number = "N\N{LATIN SMALL LETTER AE}ver"
136
gecos = "%s,%s,%s,%s," % (name.encode("utf-8"),
137
location.encode("utf-8"),
138
number.encode("utf-8"),
139
number.encode("utf-8"))
140
data = [("jdoe", "x", 1000, 1000, gecos, "/home/jdoe", "/bin/zsh")]
141
provider = FakeUserProvider(users=data, shadow_file=self.shadow_file)
142
users = provider.get_users()
143
self.assertEquals(users[0]["name"], name)
144
self.assertEquals(users[0]["location"], location)
145
self.assertEquals(users[0]["home-phone"], number)
146
self.assertEquals(users[0]["work-phone"], number)
148
def test_non_utf8_data(self):
150
If a GECOS field contains non-UTF8 data, it should be replaced
154
unicode_unknown = u'\N{REPLACEMENT CHARACTER}'
155
data = [("jdoe", "x", 1000, 1000, "\255,\255,\255,\255", "/home/jdoe",
157
provider = FakeUserProvider(users=data, shadow_file=self.shadow_file)
158
users = provider.get_users()
159
self.assertEquals(users[0]["name"], unicode_unknown)
160
self.assertEquals(users[0]["location"], unicode_unknown)
161
self.assertEquals(users[0]["home-phone"], unicode_unknown)
162
self.assertEquals(users[0]["work-phone"], unicode_unknown)
164
def test_get_disabled_user(self):
165
"""The C{enabled} field should be C{False} for disabled users."""
166
data = [("psmith", "x", 1000, 1000,
167
"Peter Smith,,,,", "/home/psmith", "/bin/bash")]
168
provider = FakeUserProvider(users=data, shadow_file=self.shadow_file,
169
locked_users=["psmith"])
170
users = provider.get_users()
171
self.assertEquals(users, [
172
{"username": "psmith", "name": u"Peter Smith", "uid": 1000,
174
"location": None, "home-phone": None, "work-phone": None,
175
"primary-gid": 1000}])
177
def test_real_user_data(self):
178
"""L{UserProvider} should work with real data."""
179
provider = UserProvider()
180
provider.shadow_file = None
181
users = provider.get_users()
182
user_0 = pwd.getpwuid(0)
184
if user["username"] == user_0.pw_name:
185
self.assertEquals(user["uid"], 0)
186
user_0_name = user_0.pw_gecos.split(",")[0].decode(
188
self.assertEquals(user["name"], user_0_name)
191
self.fail("The user %s (uid=1000) was not found in the get_data "
192
"result." % (user_1000.pw_name,))
194
def test_get_users_duplicate_usernames(self):
196
Get users should return data for all users found on the system, but it
197
should exclude duplicate usernames,
199
data = [("jdoe", "x", 1000, 1000, "JD,,,,", "/home/jdoe", "/bin/zsh"),
200
("jdoe", "x", 1001, 1001, "JD,,,,", "/home/jdoe", "/bin/zsh")]
201
provider = FakeUserProvider(users=data, shadow_file=self.shadow_file)
202
users = provider.get_users()
203
self.assertEquals(users, [{"username": "jdoe", "name": u"JD",
204
"uid": 1000, "enabled": True,
205
"location": None, "home-phone": None,
206
"work-phone": None, "primary-gid": 1000}])
208
def test_get_users_duplicate_uids(self):
210
Get users should return data for all users found on the system,
211
including users with duplicated uids.
213
data = [("joe1", "x", 1000, 1000, "JD,,,,", "/home/joe1", "/bin/zsh"),
214
("joe2", "x", 1000, 1000, "JD,,,,", "/home/joe2", "/bin/zsh")]
215
provider = FakeUserProvider(users=data, shadow_file=self.shadow_file)
216
users = provider.get_users()
217
self.assertEquals(users, [{"username": "joe1", "name": u"JD",
218
"uid": 1000, "enabled": True,
219
"location": None, "home-phone": None,
220
"work-phone": None, "primary-gid": 1000},
221
{"username": "joe2", "name": u"JD",
222
"uid": 1000, "enabled": True,
223
"location": None, "home-phone": None,
224
"work-phone": None, "primary-gid": 1000}])
226
def test_user_not_in_shadow_file(self):
228
Given a username that doesn't exist in the shadow file, we should get a
229
UserProvider error rather than a KeyError.
230
raises a L{UserProviderError} if a match isn't found.
232
data = [("johndoe", "x", 1000, 1000, "JD,,,,", "/home/jdoe", "/bin/zsh")]
233
provider = FakeUserProvider(users=data, shadow_file=self.shadow_file)
234
users = provider.get_users()
235
self.assertEquals(len(users), 1)
236
self.assertEquals(sorted([x[0] for x in data]),["johndoe"])
238
def test_get_gid(self):
240
Given a username L{UserProvider.get_gid} returns the GID or
241
raises a L{UserProviderError} if a match isn't found.
243
provider = FakeUserProvider(groups=[("jdoe", "x", 1000, [])])
244
self.assertEquals(provider.get_gid("jdoe"), 1000)
245
self.assertRaises(GroupNotFoundError, provider.get_gid, "john")
247
def test_group_without_members(self):
249
L{UserProvider.get_groups} should include groups without
252
provider = FakeUserProvider(groups=[("jdoe", "x", 1000, [])])
253
self.assertEquals(provider.get_groups(),
254
[{"name": "jdoe", "gid": 1000, "members": []}])
256
def test_group_with_members(self):
257
"""L{UserProvider.get_groups} should include groups with members."""
258
users = [("jdoe", "x", 1000, 1000, "JD,,,,", "/home/jdoe", "/bin/zsh")]
259
groups = [("sales", "x", 50, ["jdoe"])]
260
provider = FakeUserProvider(users=users, shadow_file=self.shadow_file,
262
self.assertEquals(provider.get_groups(),
263
[{"name": "sales", "gid": 50, "members": ["jdoe"]}])
265
def test_group_with_unknown_members(self):
266
"""L{UserProvider.get_groups} should include groups with members.
268
If a member's userid isn't known to the system, it shouldn't be
271
users = [("jdoe", "x", 1000, 1000, "JD,,,,", "/home/jdoe", "/bin/zsh")]
272
groups = [("sales", "x", 50, ["jdoe", "kevin"])]
273
provider = FakeUserProvider(users=users, shadow_file=self.shadow_file,
275
self.assertEquals(provider.get_groups(),
276
[{"name": "sales", "gid": 50, "members": ["jdoe"]}])
279
def test_group_with_duplicate_members(self):
281
L{UserProvider.get_groups} should only report groups once.
282
If duplicates exist they should be removed. The problem
283
reported in bug #118799 is related to duplicates being
284
reported to the server.
286
users = [("jdoe", "x", 1000, 1000, "JD,,,,", "/home/jdoe", "/bin/zsh")]
287
groups = [("sales", "x", 50, ["jdoe", "jdoe"])]
288
provider = FakeUserProvider(users=users, shadow_file=self.shadow_file,
290
self.assertEquals(provider.get_groups(),
291
[{"name": "sales", "gid": 50, "members": ["jdoe"]}])
293
def test_group_with_duplicate_groupnames(self):
295
L{UserProvider.get_groups} should only report members once.
296
If duplicates exist they should be removed. The problem
297
reported in bug #118799 is related to duplicates being
298
reported to the server.
300
users = [("jdoe", "x", 1000, 1000, "JD,,,,", "/home/jdoe", "/bin/zsh")]
301
groups = [("sales", "x", 50, ["jdoe"]),
302
("sales", "x", 51, ["jdoe"]),
304
provider = FakeUserProvider(users=users, shadow_file=self.shadow_file,
306
self.assertEquals(provider.get_groups(),
307
[{"name": "sales", "gid": 50, "members": ["jdoe"]}])
309
def test_real_group_data(self):
311
Assert that L{UserProvider.get_group}'s functionality
312
reflects what is accessible from the Python standard C{grp}
315
provider = UserProvider()
316
group_0 = grp.getgrgid(0)
317
groups = provider.get_groups()
319
if group["name"] == group_0.gr_name:
320
self.assertEquals(group["gid"], 0)
321
self.assertEquals(group["members"], group_0.gr_mem)
324
self.fail("The group %s (gid=1000) was not found in the get_data "
325
"result." % (group_1000.gr_name,))
327
def test_get_user_data(self):
328
"""This tests the functionality for parsing /etc/passwd style files."""
329
provider = UserProvider(passwd_file=self.passwd_file,
330
group_file=self.group_file)
331
users = provider.get_user_data()
332
self.assertEquals(users[0], ("root", "x", 0, 0, "root", "/root",
334
self.assertEquals(users[1], ("haldaemon", "x", 107, 116,
335
"Hardware abstraction layer,,,",
336
"/home/haldaemon", "/bin/false"))
337
self.assertEquals(users[2], ("kevin", "x", 1001, 65534,
338
"Kevin,101,+44123123,+44123124",
339
"/home/kevin", "/bin/bash"))
341
def test_get_users(self):
343
The method get_users is responsible for translating tuples of
344
information from the underlying user database into dictionaries.
346
provider = UserProvider(passwd_file=self.passwd_file,
347
group_file=self.group_file)
348
users = provider.get_users()
349
self.assertEquals(users[0], {"username": "root",
351
"uid": 0, "enabled": True,
356
self.assertEquals(users[1], {"username": "haldaemon",
357
"name": u"Hardware abstraction layer",
364
self.assertEquals(users[2], {"username": "kevin",
369
"home-phone": u"+44123124",
370
"work-phone": u"+44123123",
371
"primary-gid": 65534})
373
def test_get_group_data(self):
374
"""This tests the functionality for parsing /etc/group style files."""
375
provider = UserProvider(passwd_file=self.passwd_file,
376
group_file=self.group_file)
377
groups = provider.get_group_data()
378
self.assertEquals(groups[0], (u"root", u"x", 0, [u""]))
379
self.assertEquals(groups[1], (u"cdrom", u"x", 24,
380
[u"haldaemon", u"kevin"]))
381
self.assertEquals(groups[2], (u"kevin", u"x", 1000, [u""]))
383
def test_get_groups(self):
385
The method get_groups is responsible for translating tuples of data
386
from the underlying userdatabase into dictionaries.
388
provider = UserProvider(passwd_file=self.passwd_file,
389
group_file=self.group_file)
390
groups = provider.get_groups()
391
self.assertEquals(groups[0], {"name": u"root",
394
self.assertEquals(groups[1], {"name": u"cdrom",
396
"members": [u"kevin", u"haldaemon"]})
397
self.assertEquals(groups[2], {"name": u"kevin",
401
def test_get_users_incorrect_passwd_file(self):
403
This tests the functionality for parsing /etc/passwd style files.
405
Incorrectly formatted lines according to passwd(5) should be ignored
408
passwd_file = self.make_path("""\
409
root:x:0:0:root:/root:/bin/bash
411
haldaemon:x:107:Hardware abstraction layer,,,:/home/haldaemon:/bin/false
412
kevin:x:1001:65534:Kevin,101,+44123123,+44123124:/home/kevin:/bin/bash
417
provider = UserProvider(passwd_file=passwd_file,
418
group_file=self.group_file)
419
users = provider.get_users()
420
self.assertEquals(users[0], {"username": "root",
422
"uid": 0, "enabled": True,
427
self.assertEquals(users[1], {"username": "kevin",
432
"home-phone": u"+44123124",
433
"work-phone": u"+44123123",
434
"primary-gid": 65534})
435
log = ("WARNING: passwd file %s is incorrectly formatted: line 2." %
437
self.assertIn(log, self.logfile.getvalue())
438
log2 = ("WARNING: passwd file %s is incorrectly formatted: line 3." %
440
self.assertIn(log2, self.logfile.getvalue())
441
log3 = ("WARNING: passwd file %s is incorrectly formatted: line 6." %
443
self.assertIn(log3, self.logfile.getvalue())
445
def test_get_users_nis_line(self):
447
This tests the functionality for parsing /etc/passwd style files.
449
We should ignore the specific pattern for NIS user-extensions in passwd
452
passwd_file = self.make_path("""\
453
root:x:0:0:root:/root:/bin/bash
454
kevin:x:1001:65534:Kevin,101,+44123123,+44123124:/home/kevin:/bin/bash
460
provider = UserProvider(passwd_file=passwd_file,
461
group_file=self.group_file)
462
users = provider.get_users()
463
self.assertTrue(len(users), 2)
464
self.assertEquals(users[0], {"username": "root",
466
"uid": 0, "enabled": True,
471
self.assertEquals(users[1], {"username": "kevin",
476
"home-phone": u"+44123124",
477
"work-phone": u"+44123123",
478
"primary-gid": 65534})
479
log = ("WARNING: passwd file %s is incorrectly formatted" %
481
self.assertTrue(log not in self.logfile.getvalue())
483
def test_get_groups_incorrect_groups_file(self):
485
This tests the functionality for parsing /etc/group style files.
487
Incorrectly formatted lines according to group(5) should be ignored
490
group_file = self.make_path("""\
495
provider = UserProvider(passwd_file=self.passwd_file,
496
group_file=group_file)
497
groups = provider.get_groups()
498
self.assertEquals(groups[0], {"name": u"root", "gid": 0,
500
self.assertEquals(groups[1], {"name": u"cdrom", "gid": 24,
502
log = ("WARNING: group file %s is incorrectly formatted: line 3." % group_file)
503
self.assertIn(log, self.logfile.getvalue())
505
def test_get_groups_nis_line(self):
507
This tests the functionality for parsing /etc/group style files.
509
We should ignore the specific pattern for NIS user-extensions in group
512
group_file = self.make_path("""\
519
provider = UserProvider(passwd_file=self.passwd_file,
520
group_file=group_file)
521
groups = provider.get_groups()
522
self.assertEquals(groups[0], {"name": u"root", "gid": 0,
524
log = ("WARNING: group file %s is incorrectly formatted" % group_file)
525
self.assertTrue(log not in self.logfile.getvalue())