1
3
from landscape.tests.helpers import (
2
4
LandscapeTest, LandscapeIsolatedTest, RemoteBrokerHelper)
3
5
from landscape.manager.deployment import ManagerService, ManagerConfiguration
4
6
from landscape.manager.processkiller import ProcessKiller
5
7
from landscape.manager.scriptexecution import ALL_USERS
8
from landscape.manager.store import ManagerStore
6
9
from landscape.broker.tests.test_remote import assertTransmitterActive
7
10
from landscape.tests.test_plugin import assertReceivesMessages
12
15
def test_get_plugins(self):
13
16
configuration = ManagerConfiguration()
14
17
configuration.load(["--manager-plugins", "ProcessKiller",
15
"-d", self.make_path()])
18
"-d", self.make_dir()])
16
19
manager_service = ManagerService(configuration)
17
20
plugins = manager_service.plugins
18
21
self.assertEquals(len(plugins), 1)
21
24
def test_get_all_plugins(self):
22
25
configuration = ManagerConfiguration()
23
26
configuration.load(["--manager-plugins", "ALL",
24
"-d", self.make_path()])
27
"-d", self.make_dir()])
25
28
manager_service = ManagerService(configuration)
26
29
self.assertEquals(len(manager_service.plugins), 4)
28
31
def test_include_script_execution(self):
29
32
configuration = ManagerConfiguration()
30
33
configuration.load(["--include-manager-plugins", "ScriptExecution",
31
"-d", self.make_path()])
34
"-d", self.make_dir()])
32
35
manager_service = ManagerService(configuration)
33
36
self.assertEquals(len(manager_service.plugins), 5)
40
43
configuration = ManagerConfiguration()
41
configuration.load(["-d", self.make_path(),
44
configuration.load(["-d", self.make_dir(),
42
45
"--script-users", "foo, bar,baz"])
43
46
self.assertEquals(configuration.get_allowed_script_users(),
44
47
["foo", "bar", "baz"])
51
54
configuration = ManagerConfiguration()
52
configuration.load(["-d", self.make_path(),
55
configuration.load(["-d", self.make_dir(),
53
56
"--script-users", "\tALL "])
54
57
self.assertEquals(configuration.get_allowed_script_users(), ALL_USERS)
58
61
If no script users are specified, the default is 'nobody'.
60
63
configuration = ManagerConfiguration()
61
configuration.load(["-d", self.make_path()])
64
configuration.load(["-d", self.make_dir()])
62
65
self.assertEquals(configuration.get_allowed_script_users(),
67
69
class DeploymentBusTests(LandscapeIsolatedTest):
69
71
helpers = [RemoteBrokerHelper]
71
73
def test_dbus_reactor_transmitter_installed(self):
72
74
configuration = ManagerConfiguration()
73
configuration.load(["-d", self.make_path(), "--bus", "session",
75
configuration.load(["-d", self.make_dir(), "--bus", "session",
74
76
"--manager-plugins", "ProcessKiller"])
75
77
manager_service = ManagerService(configuration)
76
78
manager_service.startService()
80
82
def test_receives_messages(self):
81
83
configuration = ManagerConfiguration()
82
configuration.load(["-d", self.make_path(), "--bus", "session",
84
configuration.load(["-d", self.make_dir(), "--bus", "session",
83
85
"--manager-plugins", "ProcessKiller"])
84
86
manager_service = ManagerService(configuration)
85
87
manager_service.startService()
86
88
return assertReceivesMessages(self, manager_service.dbus_service,
87
89
self.broker_service, self.remote)
91
def test_manager_store(self):
92
configuration = ManagerConfiguration()
93
path = self.make_dir()
94
configuration.load(["-d", path, "--bus", "session",
95
"--manager-plugins", "ProcessKiller"])
96
manager_service = ManagerService(configuration)
97
manager_service.startService()
98
self.assertNotIdentical(manager_service.registry.store, None)
100
isinstance(manager_service.registry.store, ManagerStore))
101
self.assertTrue(os.path.isfile(os.path.join(path, "manager.database")))