~ahasenack/landscape-client/landscape-client-1.5.5-0ubuntu0.9.04.0

« back to all changes in this revision

Viewing changes to landscape/user/tests/helpers.py

  • Committer: Bazaar Package Importer
  • Author(s): Rick Clark
  • Date: 2008-09-08 16:35:57 UTC
  • mfrom: (1.1.1 upstream)
  • Revision ID: james.westby@ubuntu.com-20080908163557-l3ixzj5dxz37wnw2
Tags: 1.0.18-0ubuntu1
New upstream release 

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
from landscape.user.management import UserManagementError
 
2
from landscape.user.provider import UserProviderBase
 
3
 
 
4
 
 
5
class FakeUserManagement(object):
 
6
 
 
7
    def __init__(self, provider=None):
 
8
        self.shadow_file = getattr(provider, "shadow_file", None)
 
9
        self.provider = provider
 
10
        self._users = {}
 
11
 
 
12
        for data in self.provider.get_users():
 
13
            self._users[data["username"]] = data
 
14
        self._groups = {}
 
15
        for data in self.provider.get_groups():
 
16
            self._groups[data["name"]] = data
 
17
 
 
18
    def _make_fake_shadow_file(self, locked_users, unlocked_users):
 
19
        entry = "%s:%s:13348:0:99999:7:::\n"
 
20
        shadow_file = open(self.shadow_file, "w")
 
21
        for user in locked_users:
 
22
            shadow_file.write(entry % (user, "!"))
 
23
        for user in unlocked_users:
 
24
            shadow_file.write(entry % (user, "qweqweqeqweqw"))
 
25
        shadow_file.close()
 
26
 
 
27
    def add_user(self, username, name, password, require_password_reset,
 
28
                 primary_group_name, location, work_phone, home_phone):
 
29
        try:
 
30
            uid = 1000
 
31
            if self._users:
 
32
                uid = max([x["uid"] for x in self._users.itervalues()]) + 1
 
33
            if self._groups:
 
34
                primary_gid = self.get_gid(primary_group_name)
 
35
            else:
 
36
                primary_gid = uid
 
37
            self._users[uid] = {"username": username, "name": name,
 
38
                                "uid": uid, "enabled": True,
 
39
                                "location": location, "work-phone": work_phone,
 
40
                                "home-phone": home_phone,
 
41
                                "primary-gid": primary_gid}
 
42
            gecos_string = "%s,%s,%s,%s" % (name, location or "",
 
43
                                            work_phone or "", home_phone or "")
 
44
            userdata = (username, "x", uid, primary_gid, gecos_string,
 
45
                        "/bin/sh" , "/home/user")
 
46
            self.provider.users.append(userdata)
 
47
        except KeyError:
 
48
            raise UserManagementError("add_user failed")
 
49
        return "add_user succeeded"
 
50
 
 
51
    def lock_user(self, username):
 
52
        data = self._users.get(username, None)
 
53
        if data:
 
54
            data["enabled"] = False
 
55
            # This will generate a shadow file with only the locked user in it.
 
56
            self._make_fake_shadow_file([username], [])
 
57
            return "lock_user succeeded"
 
58
        raise UserManagementError("lock_user failed")
 
59
 
 
60
    def unlock_user(self, username):
 
61
        data = self._users.get(username, None)
 
62
        if data:
 
63
            data["enabled"] = True
 
64
            # This will generate a shadow file with only the unlocked user in it.
 
65
            self._make_fake_shadow_file([], [username])
 
66
            return "unlock_user succeeded"
 
67
        raise UserManagementError("unlock_user failed")
 
68
 
 
69
    def remove_user(self, username, delete_home=False):
 
70
        try:
 
71
            del self._users[username]
 
72
        except KeyError:
 
73
            raise UserManagementError("remove_user failed")
 
74
        remaining_users = []
 
75
        for user in self.provider.users:
 
76
            if user[0] != username:
 
77
                remaining_users.append(user)
 
78
        self.provider.users = remaining_users
 
79
        return "remove_user succeeded"
 
80
 
 
81
    def set_user_details(self, username, password=None, name=None,
 
82
                         location=None, work_number=None, home_number=None,
 
83
                         primary_group_name=None):
 
84
        data = self._users.setdefault(username, {})
 
85
        for key, value in [("name", name), ("location", location),
 
86
                            ("work-phone", work_number),
 
87
                            ("home-phone", home_number)]:
 
88
            if value:
 
89
                data[key] = value
 
90
        if primary_group_name:
 
91
            data["primary-gid"] = self.get_gid(primary_group_name)
 
92
        else:
 
93
            data["primary-gid"] = None
 
94
        userdata = (username, "x", data["uid"], data["primary-gid"],
 
95
                    "%s,%s,%s,%s," % (name, location, work_number,
 
96
                                      home_number),
 
97
                    "/bin/sh" , "/home/user")
 
98
        self.provider.users = [userdata]
 
99
        return "set_user_details succeeded"
 
100
 
 
101
    def get_gid(self, name):
 
102
        try:
 
103
            return self._groups[name]["gid"]
 
104
        except KeyError:
 
105
            raise UserManagementError("Group %s wasn't found." % name)
 
106
 
 
107
    def add_group(self, name):
 
108
        gid = 1000
 
109
        if self._groups:
 
110
            gid = max([x["gid"] for x in self._groups.itervalues()]) + 1
 
111
        self._groups[name] = {"name": name, "gid": gid, "members": []}
 
112
        self.update_provider_from_groups()
 
113
        return "add_group succeeded"
 
114
 
 
115
    def set_group_details(self, group, new_name):
 
116
        data = self._groups[group]
 
117
        data["name"] = new_name
 
118
        self._groups[new_name] = data
 
119
        del self._groups[group]
 
120
        self.update_provider_from_groups()
 
121
        return "set_group_details succeeded"
 
122
 
 
123
    def add_group_member(self, username, group):
 
124
        data = self._groups[group]
 
125
        if data:
 
126
            data["members"].append(username)
 
127
            self.update_provider_from_groups()
 
128
            return "add_group_member succeeded"
 
129
        raise UserManagementError("add_group_member failed")
 
130
 
 
131
    def remove_group_member(self, username, group):
 
132
        if group in self._groups:
 
133
            data = self._groups[group]
 
134
            data["members"].remove(username)
 
135
            self.update_provider_from_groups()
 
136
            return "remove_group_member succeeded"
 
137
        raise UserManagementError("remove_group_member failed")
 
138
 
 
139
    def remove_group(self, group):
 
140
        del self._groups[group]
 
141
        self.update_provider_from_groups()
 
142
        return "remove_group succeeded"
 
143
 
 
144
    def update_provider_from_groups(self):
 
145
        provider_list = []
 
146
        for k, v in self._groups.iteritems():
 
147
            provider_list.append((k, "x", v["gid"], v["members"]))
 
148
        self.provider.groups = provider_list
 
149
 
 
150
 
 
151
class FakeUserProvider(UserProviderBase):
 
152
 
 
153
    def __init__(self, users=None, groups=None, popen=None, shadow_file=None, 
 
154
                 locked_users=None):
 
155
        self.users = users
 
156
        self.groups = groups
 
157
        if popen:
 
158
            self.popen = popen
 
159
        self.shadow_file = shadow_file
 
160
        super(FakeUserProvider, self).__init__(locked_users=locked_users)
 
161
 
 
162
    def get_user_data(self, system=False):
 
163
        if self.users is None:
 
164
            self.users = []
 
165
        return self.users
 
166
 
 
167
    def get_group_data(self):
 
168
        if self.groups is None:
 
169
            self.groups = []
 
170
        return self.groups
 
171
 
 
172
class FakeUserInfo(object):
 
173
    """Implements enough functionality to work for Changes tests."""
 
174
 
 
175
    persist_name = "users"
 
176
    run_interval = 60
 
177
 
 
178
    def __init__(self, provider):
 
179
        self._provider = provider
 
180
 
 
181
    def register(self, manager):
 
182
        self._manager = manager
 
183
        self._persist = self._manager.persist.root_at("users")