~ack/landscape-client/sources.list-preserve-old-permissions

« back to all changes in this revision

Viewing changes to landscape/user/tests/helpers.py

  • Committer: Christopher Armstrong
  • Date: 2008-06-10 10:56:01 UTC
  • Revision ID: radix@twistedmatrix.com-20080610105601-l9qfvqjf88e7j8b6
Import landscape-client into public branch

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
from landscape.user.management import UserManagementError
 
2
from landscape.user.provider import UserProviderBase
 
3
 
 
4
 
 
5
class FakeUserManagement(object):
 
6
 
 
7
    def __init__(self, provider=None):
 
8
        self.shadow_file = getattr(provider, "shadow_file", None)
 
9
        self.provider = provider
 
10
        self._users = {}
 
11
 
 
12
        for data in self.provider.get_users():
 
13
            self._users[data["username"]] = data
 
14
        self._groups = {}
 
15
        for data in self.provider.get_groups():
 
16
            self._groups[data["name"]] = data
 
17
 
 
18
    def _make_fake_shadow_file(self, locked_users, unlocked_users):
 
19
        entry = "%s:%s:13348:0:99999:7:::\n"
 
20
        shadow_file = open(self.shadow_file, "w")
 
21
        for user in locked_users:
 
22
            shadow_file.write(entry % (user, "!"))
 
23
        for user in unlocked_users:
 
24
            shadow_file.write(entry % (user, "qweqweqeqweqw"))
 
25
        shadow_file.close()
 
26
 
 
27
    def add_user(self, username, name, password, require_password_reset,
 
28
                 primary_group_name):
 
29
        try:
 
30
            uid = 1000
 
31
            if self._users:
 
32
                uid = max([x["uid"] for x in self._users.itervalues()]) + 1
 
33
            if self._groups:
 
34
                primary_gid = self.get_gid(primary_group_name)
 
35
            else:
 
36
                primary_gid = uid
 
37
            self._users[uid] = {"username": username, "name": name,
 
38
                                "uid": uid, "enabled": True,
 
39
                                "location": None, "work-phone": None,
 
40
                                "home-phone": None, "primary-gid": primary_gid}
 
41
            userdata = (username, "x", uid, primary_gid, "%s,,,," % name,
 
42
                        "/bin/sh" , "/home/user")
 
43
            self.provider.users.append(userdata)
 
44
        except KeyError:
 
45
            raise UserManagementError("add_user failed")
 
46
        return "add_user succeeded"
 
47
 
 
48
    def lock_user(self, username):
 
49
        data = self._users.get(username, None)
 
50
        if data:
 
51
            data["enabled"] = False
 
52
            # This will generate a shadow file with only the locked user in it.
 
53
            self._make_fake_shadow_file([username], [])
 
54
            return "lock_user succeeded"
 
55
        raise UserManagementError("lock_user failed")
 
56
 
 
57
    def unlock_user(self, username):
 
58
        data = self._users.get(username, None)
 
59
        if data:
 
60
            data["enabled"] = True
 
61
            # This will generate a shadow file with only the unlocked user in it.
 
62
            self._make_fake_shadow_file([], [username])
 
63
            return "unlock_user succeeded"
 
64
        raise UserManagementError("unlock_user failed")
 
65
 
 
66
    def remove_user(self, username, delete_home=False):
 
67
        try:
 
68
            del self._users[username]
 
69
        except KeyError:
 
70
            raise UserManagementError("remove_user failed")
 
71
        remaining_users = []
 
72
        for user in self.provider.users:
 
73
            if user[0] != username:
 
74
                remaining_users.append(user)
 
75
        self.provider.users = remaining_users
 
76
        return "remove_user succeeded"
 
77
 
 
78
    def set_user_details(self, username, password=None, name=None,
 
79
                         location=None, work_number=None, home_number=None,
 
80
                         primary_group_name=None):
 
81
        data = self._users.setdefault(username, {})
 
82
        for key, value in [("name", name), ("location", location),
 
83
                            ("work-phone", work_number),
 
84
                            ("home-phone", home_number)]:
 
85
            if value:
 
86
                data[key] = value
 
87
        if primary_group_name:
 
88
            data["primary-gid"] = self.get_gid(primary_group_name)
 
89
        else:
 
90
            data["primary-gid"] = None
 
91
        userdata = (username, "x", data["uid"], data["primary-gid"],
 
92
                    "%s,%s,%s,%s," % (name, location, work_number,
 
93
                                      home_number),
 
94
                    "/bin/sh" , "/home/user")
 
95
        self.provider.users = [userdata]
 
96
        return "set_user_details succeeded"
 
97
 
 
98
    def get_gid(self, name):
 
99
        try:
 
100
            return self._groups[name]["gid"]
 
101
        except KeyError:
 
102
            raise UserManagementError("Group %s wasn't found." % name)
 
103
 
 
104
    def add_group(self, name):
 
105
        gid = 1000
 
106
        if self._groups:
 
107
            gid = max([x["gid"] for x in self._groups.itervalues()]) + 1
 
108
        self._groups[name] = {"name": name, "gid": gid, "members": []}
 
109
        self.update_provider_from_groups()
 
110
        return "add_group succeeded"
 
111
 
 
112
    def set_group_details(self, group, new_name):
 
113
        data = self._groups[group]
 
114
        data["name"] = new_name
 
115
        self._groups[new_name] = data
 
116
        del self._groups[group]
 
117
        self.update_provider_from_groups()
 
118
        return "set_group_details succeeded"
 
119
 
 
120
    def add_group_member(self, username, group):
 
121
        data = self._groups[group]
 
122
        if data:
 
123
            data["members"].append(username)
 
124
            self.update_provider_from_groups()
 
125
            return "add_group_member succeeded"
 
126
        raise UserManagementError("add_group_member failed")
 
127
 
 
128
    def remove_group_member(self, username, group):
 
129
        if group in self._groups:
 
130
            data = self._groups[group]
 
131
            data["members"].remove(username)
 
132
            self.update_provider_from_groups()
 
133
            return "remove_group_member succeeded"
 
134
        raise UserManagementError("remove_group_member failed")
 
135
 
 
136
    def remove_group(self, group):
 
137
        del self._groups[group]
 
138
        self.update_provider_from_groups()
 
139
        return "remove_group succeeded"
 
140
 
 
141
    def update_provider_from_groups(self):
 
142
        provider_list = []
 
143
        for k, v in self._groups.iteritems():
 
144
            provider_list.append((k, "x", v["gid"], v["members"]))
 
145
        self.provider.groups = provider_list
 
146
 
 
147
 
 
148
class FakeUserProvider(UserProviderBase):
 
149
 
 
150
    def __init__(self, users=None, groups=None, popen=None, shadow_file=None, 
 
151
                 locked_users=None):
 
152
        self.users = users
 
153
        self.groups = groups
 
154
        if popen:
 
155
            self.popen = popen
 
156
        self.shadow_file = shadow_file
 
157
        super(FakeUserProvider, self).__init__(locked_users=locked_users)
 
158
 
 
159
    def get_user_data(self, system=False):
 
160
        if self.users is None:
 
161
            self.users = []
 
162
        return self.users
 
163
 
 
164
    def get_group_data(self):
 
165
        if self.groups is None:
 
166
            self.groups = []
 
167
        return self.groups
 
168
 
 
169
class FakeUserInfo(object):
 
170
    """Implements enough functionality to work for Changes tests."""
 
171
 
 
172
    persist_name = "users"
 
173
    run_interval = 60
 
174
 
 
175
    def __init__(self, provider):
 
176
        self._provider = provider
 
177
 
 
178
    def register(self, manager):
 
179
        self._manager = manager
 
180
        self._persist = self._manager.persist.root_at("users")