1
from landscape.user.management import UserManagementError
2
from landscape.user.provider import UserProviderBase
5
class FakeUserManagement(object):
7
def __init__(self, provider=None):
8
self.shadow_file = getattr(provider, "shadow_file", None)
9
self.provider = provider
12
for data in self.provider.get_users():
13
self._users[data["username"]] = data
15
for data in self.provider.get_groups():
16
self._groups[data["name"]] = data
18
def _make_fake_shadow_file(self, locked_users, unlocked_users):
19
entry = "%s:%s:13348:0:99999:7:::\n"
20
shadow_file = open(self.shadow_file, "w")
21
for user in locked_users:
22
shadow_file.write(entry % (user, "!"))
23
for user in unlocked_users:
24
shadow_file.write(entry % (user, "qweqweqeqweqw"))
27
def add_user(self, username, name, password, require_password_reset,
28
primary_group_name, location, work_phone, home_phone):
32
uid = max([x["uid"] for x in self._users.itervalues()]) + 1
34
primary_gid = self.get_gid(primary_group_name)
37
self._users[uid] = {"username": username, "name": name,
38
"uid": uid, "enabled": True,
39
"location": location, "work-phone": work_phone,
40
"home-phone": home_phone,
41
"primary-gid": primary_gid}
42
gecos_string = "%s,%s,%s,%s" % (name, location or "",
43
work_phone or "", home_phone or "")
44
userdata = (username, "x", uid, primary_gid, gecos_string,
45
"/bin/sh" , "/home/user")
46
self.provider.users.append(userdata)
48
raise UserManagementError("add_user failed")
49
return "add_user succeeded"
51
def lock_user(self, username):
52
data = self._users.get(username, None)
54
data["enabled"] = False
55
# This will generate a shadow file with only the locked user in it.
56
self._make_fake_shadow_file([username], [])
57
return "lock_user succeeded"
58
raise UserManagementError("lock_user failed")
60
def unlock_user(self, username):
61
data = self._users.get(username, None)
63
data["enabled"] = True
64
# This will generate a shadow file with only the unlocked user in it.
65
self._make_fake_shadow_file([], [username])
66
return "unlock_user succeeded"
67
raise UserManagementError("unlock_user failed")
69
def remove_user(self, username, delete_home=False):
71
del self._users[username]
73
raise UserManagementError("remove_user failed")
75
for user in self.provider.users:
76
if user[0] != username:
77
remaining_users.append(user)
78
self.provider.users = remaining_users
79
return "remove_user succeeded"
81
def set_user_details(self, username, password=None, name=None,
82
location=None, work_number=None, home_number=None,
83
primary_group_name=None):
84
data = self._users.setdefault(username, {})
85
for key, value in [("name", name), ("location", location),
86
("work-phone", work_number),
87
("home-phone", home_number)]:
90
if primary_group_name:
91
data["primary-gid"] = self.get_gid(primary_group_name)
93
data["primary-gid"] = None
94
userdata = (username, "x", data["uid"], data["primary-gid"],
95
"%s,%s,%s,%s," % (name, location, work_number,
97
"/bin/sh" , "/home/user")
98
self.provider.users = [userdata]
99
return "set_user_details succeeded"
101
def get_gid(self, name):
103
return self._groups[name]["gid"]
105
raise UserManagementError("Group %s wasn't found." % name)
107
def add_group(self, name):
110
gid = max([x["gid"] for x in self._groups.itervalues()]) + 1
111
self._groups[name] = {"name": name, "gid": gid, "members": []}
112
self.update_provider_from_groups()
113
return "add_group succeeded"
115
def set_group_details(self, group, new_name):
116
data = self._groups[group]
117
data["name"] = new_name
118
self._groups[new_name] = data
119
del self._groups[group]
120
self.update_provider_from_groups()
121
return "set_group_details succeeded"
123
def add_group_member(self, username, group):
124
data = self._groups[group]
126
data["members"].append(username)
127
self.update_provider_from_groups()
128
return "add_group_member succeeded"
129
raise UserManagementError("add_group_member failed")
131
def remove_group_member(self, username, group):
132
if group in self._groups:
133
data = self._groups[group]
134
data["members"].remove(username)
135
self.update_provider_from_groups()
136
return "remove_group_member succeeded"
137
raise UserManagementError("remove_group_member failed")
139
def remove_group(self, group):
140
del self._groups[group]
141
self.update_provider_from_groups()
142
return "remove_group succeeded"
144
def update_provider_from_groups(self):
146
for k, v in self._groups.iteritems():
147
provider_list.append((k, "x", v["gid"], v["members"]))
148
self.provider.groups = provider_list
151
class FakeUserProvider(UserProviderBase):
153
def __init__(self, users=None, groups=None, popen=None, shadow_file=None,
159
self.shadow_file = shadow_file
160
super(FakeUserProvider, self).__init__(locked_users=locked_users)
162
def get_user_data(self, system=False):
163
if self.users is None:
167
def get_group_data(self):
168
if self.groups is None:
172
class FakeUserInfo(object):
173
"""Implements enough functionality to work for Changes tests."""
175
persist_name = "users"
178
def __init__(self, provider):
179
self._provider = provider
181
def register(self, manager):
182
self._manager = manager
183
self._persist = self._manager.persist.root_at("users")