1
from landscape.lib.dbus_util import get_object
3
3
from landscape.lib.persist import Persist
5
from landscape.manager.manager import SUCCEEDED, FAILED
6
from landscape.monitor.monitor import MonitorPluginRegistry
7
from landscape.manager.manager import ManagerPluginRegistry
4
from landscape.manager.plugin import SUCCEEDED, FAILED
8
5
from landscape.monitor.usermonitor import UserMonitor
9
from landscape.manager.usermanager import UserManager, UserManagerDBusObject
6
from landscape.manager.usermanager import (
7
UserManager, RemoteUserManagerConnector)
10
8
from landscape.user.tests.helpers import FakeUserProvider, FakeUserManagement
11
from landscape.tests.helpers import LandscapeIsolatedTest
9
from landscape.tests.helpers import LandscapeTest, ManagerHelper
12
10
from landscape.user.provider import UserManagementError
13
from landscape.tests.helpers import RemoteBrokerHelper
16
class ManagerServiceTest(LandscapeIsolatedTest):
18
helpers = [RemoteBrokerHelper]
21
super(ManagerServiceTest, self).setUp()
23
def test_remote_locked_usernames(self):
25
def check_result(result):
26
self.assertEquals(result, ["psmith"])
28
self.shadow_file = self.makeFile("""\
29
jdoe:$1$xFlQvTqe$cBtrNEDOIKMy/BuJoUdeG0:13348:0:99999:7:::
30
psmith:!:13348:0:99999:7:::
31
sbarnes:$1$q7sz09uw$q.A3526M/SHu8vUb.Jo1A/:13349:0:99999:7:::
33
UserManagerDBusObject(self.broker_service.bus,
34
shadow_file=self.shadow_file)
35
remote_service = get_object(self.broker_service.bus,
36
UserManagerDBusObject.bus_name, UserManagerDBusObject.object_path)
38
result = remote_service.get_locked_usernames()
39
result.addCallback(check_result)
42
def test_remote_empty_shadow_file(self):
44
def check_result(result):
45
self.assertEquals(result, [])
47
self.shadow_file = self.makeFile("\n")
48
UserManagerDBusObject(self.broker_service.bus,
49
shadow_file=self.shadow_file)
50
remote_service = get_object(self.broker_service.bus,
51
UserManagerDBusObject.bus_name, UserManagerDBusObject.object_path)
53
result = remote_service.get_locked_usernames()
54
result.addCallback(check_result)
57
class UserGroupTestBase(LandscapeIsolatedTest):
59
helpers = [RemoteBrokerHelper]
13
class UserGroupTestBase(LandscapeTest):
15
helpers = [ManagerHelper]
62
18
super(UserGroupTestBase, self).setUp()
63
self.persist = Persist()
64
self.monitor = MonitorPluginRegistry(
65
self.remote, self.broker_service.reactor,
66
self.broker_service.config, self.broker_service.bus,
69
19
self.shadow_file = self.makeFile("""\
70
20
jdoe:$1$xFlQvTqe$cBtrNEDOIKMy/BuJoUdeG0:13348:0:99999:7:::
71
21
psmith:!:13348:0:99999:7:::
74
24
accepted_types = ["operation-result", "users"]
75
25
self.broker_service.message_store.set_accepted_types(accepted_types)
76
self.manager = ManagerPluginRegistry(
77
self.remote, self.broker_service.reactor,
78
self.broker_service.config, self.broker_service.bus)
28
super(UserGroupTestBase, self).tearDown()
29
for plugin in self.plugins:
80
32
def setup_environment(self, users, groups, shadow_file):
81
33
provider = FakeUserProvider(users=users, groups=groups,
82
34
shadow_file=shadow_file)
83
plugin = UserMonitor(provider=provider)
84
self.monitor.add(plugin)
85
manager = UserManager(management=FakeUserManagement(provider=provider),
86
shadow_file=shadow_file)
87
self.manager.add(manager)
35
user_monitor = UserMonitor(provider=provider)
36
management = FakeUserManagement(provider=provider)
37
user_manager = UserManager(management=management,
38
shadow_file=shadow_file)
39
self.manager.persist = Persist()
40
user_monitor.register(self.manager)
41
user_manager.register(self.manager)
42
self.plugins = [user_monitor, user_manager]
91
46
class UserOperationsMessagingTest(UserGroupTestBase):
245
205
"name": "John Doe",
246
206
"primary-gid": 1001}],
247
207
"timestamp": 0, "type": "users",
248
"operation-id": 99},])
208
"operation-id": 99}])
250
users = [("jdoe", "x", 1001, 1000, "John Doe,,,,", "/home/bo", "/bin/zsh")]
210
users = [("jdoe", "x", 1001, 1000, "John Doe,,,,",
211
"/home/bo", "/bin/zsh")]
251
212
groups = [("users", "x", 1001, [])]
252
213
self.setup_environment(users, groups, None)
253
214
result = self.manager.dispatch_message(
276
237
def handle_callback2(result, messages):
277
new_messages = self.broker_service.message_store.get_pending_messages()
238
mstore = self.broker_service.message_store
239
new_messages = mstore.get_pending_messages()
278
240
self.assertEquals(messages, new_messages)
281
users = [("jdoe", "x", 1000, 1000, "John Doe,,,,", "/home/bo", "/bin/zsh")]
243
users = [("jdoe", "x", 1000, 1000, "John Doe,,,,",
244
"/home/bo", "/bin/zsh")]
282
245
plugin = self.setup_environment(users, [], None)
283
246
result = self.manager.dispatch_message(
284
247
{"username": "jdoe", "password": "password", "name": "John Doe",
295
258
should first detect changes and then perform the operation.
296
259
The results should be reported in separate messages.
298
262
def handle_callback(result):
299
263
messages = self.broker_service.message_store.get_pending_messages()
300
264
self.assertEquals(len(messages), 3)
301
265
self.assertMessages([messages[0], messages[2]],
302
266
[{"type": "users",
303
"create-group-members": {u"users": [u"jdoe"]},
267
"create-group-members": {u"users":
304
269
"create-groups": [{"gid": 1001,
305
270
"name": u"users"}],
306
271
"create-users": [{"home-phone": None,
380
346
self.assertMessages(messages,
381
347
[{"type": "operation-result", "status": FAILED,
382
348
"operation-id": 39, "timestamp": 0,
383
"result-text": failure_string}
349
"result-text": failure_string}])
386
351
self.setup_environment([], [], None)
387
352
result = self.manager.dispatch_message(
402
366
about the change and an C{operation-result} with details of
403
367
the outcome of the operation.
405
370
def handle_callback(result):
406
messages = self.broker_service.message_store.get_pending_messages()
372
self.broker_service.message_store.get_pending_messages())
407
373
self.assertEquals(len(messages), 3)
408
374
# Ignore the message created by plugin.run.
409
375
self.assertMessages([messages[2], messages[1]],
414
380
"operation-id": 39, "timestamp": 0,
415
381
"result-text": "remove_user succeeded"}])
417
users = [("jdoe", "x", 1000, 1000, "John Doe,,,,", "/home/bo", "/bin/zsh")]
383
users = [("jdoe", "x", 1000, 1000,
384
"John Doe,,,,", "/home/bo", "/bin/zsh")]
418
385
self.setup_environment(users, [], None)
419
386
result = self.manager.dispatch_message(
420
387
{"username": "jdoe",
498
467
C{operation-result} with details of the outcome of the
501
471
def handle_callback(result):
502
messages = self.broker_service.message_store.get_pending_messages()
503
self.assertEquals(len(messages), 3, messages)
504
# Ignore the message created by plugin.run.
505
self.assertMessages([messages[2], messages[1]],
506
[{"timestamp": 0, "type": "users", "operation-id": 99,
507
"update-users": [{"home-phone": None,
514
"name": u"John Doe"}]},
515
{"type": "operation-result",
517
"operation-id": 99, "timestamp": 0,
518
"result-text": "lock_user succeeded"}])
472
messages = self.broker_service.message_store.get_pending_messages()
473
self.assertEquals(len(messages), 3, messages)
474
# Ignore the message created by plugin.run.
475
self.assertMessages([messages[2], messages[1]],
476
[{"timestamp": 0, "type": "users", "operation-id": 99,
477
"update-users": [{"home-phone": None,
484
"name": u"John Doe"}]},
485
{"type": "operation-result",
487
"operation-id": 99, "timestamp": 0,
488
"result-text": "lock_user succeeded"}])
520
490
users = [("jdoe", "x", 1000, 1000, "John Doe,,,,", "/home/bo", "/bin/zsh")]
521
491
self.setup_environment(users, [], self.shadow_file)
747
723
users = [("psmith", "x", 1000, 1000, "Paul Smith,,,,", "/home/psmith",
749
plugin = self.setup_environment(users, [], self.shadow_file)
725
self.setup_environment(users, [], self.shadow_file)
751
727
result = self.manager.dispatch_message(
752
728
{"username": "psmith",
1011
994
"create-group-members": {"bizdev": ["jdoe"]}},
1012
995
{"type": "operation-result", "timestamp": 0,
1013
996
"status": SUCCEEDED, "operation-id": 123,
1014
"result-text": "add_group_member succeeded"}
997
"result-text": "add_group_member succeeded"}])
1018
1000
users = [("jdoe", "x", 1000, 1000, "John Doe,,,,", "/bin/sh",
1088
1072
users = [("jdoe", "x", 1000, 1000, "John Doe,,,,", "/bin/sh",
1090
1074
groups = [("bizdev", "x", 1001, [])]
1091
plugin = self.setup_environment(users, groups, None)
1075
self.setup_environment(users, groups, None)
1092
1076
result = self.manager.dispatch_message(
1093
1077
{"username": "jdoe",
1094
1078
"groupname": "bizdev",
1298
1287
result = plugin.run()
1299
1288
result.addCallback(handle_callback1)
1292
class UserManagerTest(LandscapeTest):
1295
super(UserManagerTest, self).setUp()
1296
self.shadow_file = self.makeFile()
1297
self.user_manager = UserManager(shadow_file=self.shadow_file)
1299
def test_get_locked_usernames(self):
1301
The L{UserManager.get_locked_usernames} method returns only user names
1304
fd = open(self.shadow_file, "w")
1305
fd.write("jdoe:$1$xFlQvTqe$cBtrNEDOIKMy/BuJoUdeG0:13348:0:99999:7:::\n"
1306
"psmith:!:13348:0:99999:7:::\n"
1307
"yo:$1$q7sz09uw$q.A3526M/SHu8vUb.Jo1A/:13349:0:99999:7:::\n")
1309
self.assertEquals(self.user_manager.get_locked_usernames(), ["psmith"])
1311
def test_get_locked_usernames_with_empty_shadow_file(self):
1313
The L{UserManager.get_locked_usernames} method returns an empty C{list}
1314
if the shadow file is empty.
1316
fd = open(self.shadow_file, "w")
1319
self.assertEquals(self.user_manager.get_locked_usernames(), [])
1321
def test_get_locked_usernames_with_non_existing_shadow_file(self):
1323
The L{UserManager.get_locked_usernames} method returns an empty C{list}
1324
if the shadow file can't be read. An error message is logged as well.
1326
self.log_helper.ignore_errors("Error reading shadow file.*")
1327
self.assertFalse(os.path.exists(self.shadow_file))
1328
self.assertEquals(self.user_manager.get_locked_usernames(), [])
1329
self.assertIn("Error reading shadow file. [Errno 2] No such file or "
1330
"directory", self.logfile.getvalue())
1333
class RemoteUserManagerTest(LandscapeTest):
1335
helpers = [ManagerHelper]
1338
super(RemoteUserManagerTest, self).setUp()
1340
def set_remote(remote):
1341
self.remote_user_manager = remote
1343
self.shadow_file = self.makeFile()
1344
self.user_manager = UserManager(shadow_file=self.shadow_file)
1345
self.user_manager_connector = RemoteUserManagerConnector(self.reactor,
1347
self.user_manager.register(self.manager)
1348
connected = self.user_manager_connector.connect()
1349
return connected.addCallback(set_remote)
1352
self.user_manager_connector.disconnect()
1353
self.user_manager.stop()
1354
return super(RemoteUserManagerTest, self).tearDown()
1356
def test_get_locked_usernames(self):
1358
The L{get_locked_usernames} method forwards the request to the
1359
remote L{UserManager} object.
1361
self.user_manager.get_locked_usernames = self.mocker.mock()
1362
self.expect(self.user_manager.get_locked_usernames()).result(["fred"])
1363
self.mocker.replay()
1364
result = self.remote_user_manager.get_locked_usernames()
1365
return self.assertSuccess(result, ["fred"])