1
from landscape.user.management import UserManagementError
2
from landscape.user.provider import UserProviderBase
5
class FakeUserManagement(object):
7
def __init__(self, provider=None):
8
self.shadow_file = getattr(provider, "shadow_file", None)
9
self.provider = provider
12
for data in self.provider.get_users():
13
self._users[data["username"]] = data
15
for data in self.provider.get_groups():
16
self._groups[data["name"]] = data
18
def _make_fake_shadow_file(self, locked_users, unlocked_users):
19
entry = "%s:%s:13348:0:99999:7:::\n"
20
shadow_file = open(self.shadow_file, "w")
21
for user in locked_users:
22
shadow_file.write(entry % (user, "!"))
23
for user in unlocked_users:
24
shadow_file.write(entry % (user, "qweqweqeqweqw"))
27
def add_user(self, username, name, password, require_password_reset,
32
uid = max([x["uid"] for x in self._users.itervalues()]) + 1
34
primary_gid = self.get_gid(primary_group_name)
37
self._users[uid] = {"username": username, "name": name,
38
"uid": uid, "enabled": True,
39
"location": None, "work-phone": None,
40
"home-phone": None, "primary-gid": primary_gid}
41
userdata = (username, "x", uid, primary_gid, "%s,,,," % name,
42
"/bin/sh" , "/home/user")
43
self.provider.users.append(userdata)
45
raise UserManagementError("add_user failed")
46
return "add_user succeeded"
48
def lock_user(self, username):
49
data = self._users.get(username, None)
51
data["enabled"] = False
52
# This will generate a shadow file with only the locked user in it.
53
self._make_fake_shadow_file([username], [])
54
return "lock_user succeeded"
55
raise UserManagementError("lock_user failed")
57
def unlock_user(self, username):
58
data = self._users.get(username, None)
60
data["enabled"] = True
61
# This will generate a shadow file with only the unlocked user in it.
62
self._make_fake_shadow_file([], [username])
63
return "unlock_user succeeded"
64
raise UserManagementError("unlock_user failed")
66
def remove_user(self, username, delete_home=False):
68
del self._users[username]
70
raise UserManagementError("remove_user failed")
72
for user in self.provider.users:
73
if user[0] != username:
74
remaining_users.append(user)
75
self.provider.users = remaining_users
76
return "remove_user succeeded"
78
def set_user_details(self, username, password=None, name=None,
79
location=None, work_number=None, home_number=None,
80
primary_group_name=None):
81
data = self._users.setdefault(username, {})
82
for key, value in [("name", name), ("location", location),
83
("work-phone", work_number),
84
("home-phone", home_number)]:
87
if primary_group_name:
88
data["primary-gid"] = self.get_gid(primary_group_name)
90
data["primary-gid"] = None
91
userdata = (username, "x", data["uid"], data["primary-gid"],
92
"%s,%s,%s,%s," % (name, location, work_number,
94
"/bin/sh" , "/home/user")
95
self.provider.users = [userdata]
96
return "set_user_details succeeded"
98
def get_gid(self, name):
100
return self._groups[name]["gid"]
102
raise UserManagementError("Group %s wasn't found." % name)
104
def add_group(self, name):
107
gid = max([x["gid"] for x in self._groups.itervalues()]) + 1
108
self._groups[name] = {"name": name, "gid": gid, "members": []}
109
self.update_provider_from_groups()
110
return "add_group succeeded"
112
def set_group_details(self, group, new_name):
113
data = self._groups[group]
114
data["name"] = new_name
115
self._groups[new_name] = data
116
del self._groups[group]
117
self.update_provider_from_groups()
118
return "set_group_details succeeded"
120
def add_group_member(self, username, group):
121
data = self._groups[group]
123
data["members"].append(username)
124
self.update_provider_from_groups()
125
return "add_group_member succeeded"
126
raise UserManagementError("add_group_member failed")
128
def remove_group_member(self, username, group):
129
if group in self._groups:
130
data = self._groups[group]
131
data["members"].remove(username)
132
self.update_provider_from_groups()
133
return "remove_group_member succeeded"
134
raise UserManagementError("remove_group_member failed")
136
def remove_group(self, group):
137
del self._groups[group]
138
self.update_provider_from_groups()
139
return "remove_group succeeded"
141
def update_provider_from_groups(self):
143
for k, v in self._groups.iteritems():
144
provider_list.append((k, "x", v["gid"], v["members"]))
145
self.provider.groups = provider_list
148
class FakeUserProvider(UserProviderBase):
150
def __init__(self, users=None, groups=None, popen=None, shadow_file=None,
156
self.shadow_file = shadow_file
157
super(FakeUserProvider, self).__init__(locked_users=locked_users)
159
def get_user_data(self, system=False):
160
if self.users is None:
164
def get_group_data(self):
165
if self.groups is None:
169
class FakeUserInfo(object):
170
"""Implements enough functionality to work for Changes tests."""
172
persist_name = "users"
175
def __init__(self, provider):
176
self._provider = provider
178
def register(self, manager):
179
self._manager = manager
180
self._persist = self._manager.persist.root_at("users")