~ahasenack/landscape-client/landscape-client-1.5.5-0ubuntu0.9.04.0

« back to all changes in this revision

Viewing changes to landscape/tests/test_deployment.py

  • Committer: Bazaar Package Importer
  • Author(s): Rick Clark
  • Date: 2008-09-08 16:35:57 UTC
  • mfrom: (1.1.1 upstream)
  • Revision ID: james.westby@ubuntu.com-20080908163557-l3ixzj5dxz37wnw2
Tags: 1.0.18-0ubuntu1
New upstream release 

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
import sys
 
2
from optparse import OptionParser
 
3
import logging
 
4
import signal
 
5
 
 
6
from landscape.lib.dbus_util import Object
 
7
from landscape.deployment import (
 
8
    LandscapeService, Configuration, get_versioned_persist,
 
9
    assert_unowned_bus_name)
 
10
from landscape.tests.helpers import (
 
11
    LandscapeTest, LandscapeIsolatedTest, DBusHelper)
 
12
from landscape.tests.mocker import ANY
 
13
 
 
14
 
 
15
class ConfigurationTest(LandscapeTest):
 
16
 
 
17
    def setUp(self):
 
18
        super(ConfigurationTest, self).setUp()
 
19
        self.reset_config()
 
20
 
 
21
    def reset_config(self, configuration_class=None):
 
22
        if not configuration_class:
 
23
            class MyConfiguration(Configuration):
 
24
                default_config_filenames = []
 
25
            configuration_class = MyConfiguration
 
26
        self.config_class = configuration_class
 
27
        self.config = configuration_class()
 
28
        self.parser = self.config.make_parser()
 
29
 
 
30
    def test_get(self):
 
31
        self.write_config_file(log_level="file")
 
32
        self.config.load([])
 
33
        self.assertEquals(self.config.get("log_level"), "file")
 
34
        self.assertEquals(self.config.get("random_key"), None)
 
35
 
 
36
    def write_config_file(self, **kwargs):
 
37
        section_name = kwargs.pop("section_name", "client")
 
38
        config = "\n".join(["[%s]" % (section_name,)] +
 
39
                           ["%s = %s" % pair for pair in kwargs.items()])
 
40
        self.config_filename = self.makeFile(config)
 
41
        self.config.default_config_filenames[:] = [self.config_filename]
 
42
 
 
43
    def test_command_line_has_precedence(self):
 
44
        self.write_config_file(log_level="file")
 
45
        self.config.load(["--log-level", "command line"])
 
46
        self.assertEquals(self.config.log_level, "command line")
 
47
 
 
48
    def test_command_line_option_without_default(self):
 
49
        class MyConfiguration(Configuration):
 
50
            def make_parser(self):
 
51
                parser = OptionParser()
 
52
                # Keep the dash in the option name to ensure it works.
 
53
                parser.add_option("--foo-bar")
 
54
                return parser
 
55
        self.assertEquals(MyConfiguration().foo_bar, None)
 
56
 
 
57
    def test_command_line_with_required_options(self):
 
58
        class MyConfiguration(Configuration):
 
59
            required_options = ("foo_bar",)
 
60
            config = None
 
61
            def make_parser(self):
 
62
                parser = super(MyConfiguration, self).make_parser()
 
63
                # Keep the dash in the option name to ensure it works.
 
64
                parser.add_option("--foo-bar", metavar="NAME")
 
65
                return parser
 
66
        self.reset_config(configuration_class=MyConfiguration)
 
67
        self.write_config_file()
 
68
 
 
69
        sys_exit_mock = self.mocker.replace(sys.exit)
 
70
        sys_exit_mock(ANY)
 
71
        self.mocker.count(1)
 
72
        self.mocker.replay()
 
73
 
 
74
        self.config.load([]) # This will call our mocked sys.exit.
 
75
        self.config.load(["--foo-bar", "ooga"])
 
76
        self.assertEquals(self.config.foo_bar, "ooga")
 
77
 
 
78
    def test_command_line_with_unsaved_options(self):
 
79
        class MyConfiguration(Configuration):
 
80
            unsaved_options = ("foo_bar",)
 
81
            config = None
 
82
            def make_parser(self):
 
83
                parser = super(MyConfiguration, self).make_parser()
 
84
                # Keep the dash in the option name to ensure it works.
 
85
                parser.add_option("--foo-bar", metavar="NAME")
 
86
                return parser
 
87
        self.reset_config(configuration_class=MyConfiguration)
 
88
        self.write_config_file()
 
89
 
 
90
        self.config.load(["--foo-bar", "ooga"])
 
91
        self.assertEquals(self.config.foo_bar, "ooga")
 
92
        self.config.write()
 
93
 
 
94
        self.config.load([])
 
95
        self.assertEquals(self.config.foo_bar, None)
 
96
 
 
97
    def test_config_file_has_precedence_over_default(self):
 
98
        self.write_config_file(log_level="file")
 
99
        self.config.load([])
 
100
        self.assertEquals(self.config.log_level, "file")
 
101
 
 
102
    def test_different_config_file_section(self):
 
103
        class MyConfiguration(Configuration):
 
104
            config_section = "babble"
 
105
            default_config_filenames = []
 
106
            def make_parser(self):
 
107
                parser = super(MyConfiguration, self).make_parser()
 
108
                parser.add_option("--whatever", metavar="STUFF")
 
109
                return parser
 
110
        self.reset_config(configuration_class=MyConfiguration)
 
111
        self.write_config_file(section_name="babble", whatever="yay")
 
112
        self.config.load([])
 
113
        self.assertEquals(self.config.whatever, "yay")
 
114
 
 
115
    def test_write_configuration(self):
 
116
        self.write_config_file(log_level="debug")
 
117
        self.config.log_level = "warning"
 
118
        self.config.write()
 
119
        data = open(self.config_filename).read()
 
120
        self.assertEquals(data.strip(), "[client]\nlog_level = warning")
 
121
 
 
122
    def test_write_on_the_right_default_config_file(self):
 
123
        self.write_config_file(log_level="debug")
 
124
        config_class = self.config_class
 
125
        config_class.default_config_filenames.insert(0, "/non/existent")
 
126
        self.config.load([])
 
127
        self.config.log_level = "warning"
 
128
        self.config.write()
 
129
        data = open(self.config_filename).read()
 
130
        self.assertEquals(data.strip(), "[client]\nlog_level = warning")
 
131
 
 
132
    def test_dont_write_default_options(self):
 
133
        self.write_config_file(log_level="debug")
 
134
        self.config.log_level = "info"
 
135
        self.config.write()
 
136
        data = open(self.config_filename).read()
 
137
        self.assertEquals(data.strip(), "[client]")
 
138
 
 
139
    def test_dont_write_config_option(self):
 
140
        self.write_config_file()
 
141
        self.config.config = self.config_filename
 
142
        self.config.write()
 
143
        data = open(self.config_filename).read()
 
144
        self.assertEquals(data.strip(), "[client]")
 
145
 
 
146
    def test_write_command_line_options(self):
 
147
        self.write_config_file()
 
148
        self.config.load(["--log-level", "warning"])
 
149
        self.config.write()
 
150
        data = open(self.config_filename).read()
 
151
        self.assertEquals(data.strip(), "[client]\nlog_level = warning")
 
152
 
 
153
    def test_write_command_line_precedence(self):
 
154
        """Command line options take precedence over config file when writing.
 
155
        """
 
156
        self.write_config_file(log_level="debug")
 
157
        self.config.load(["--log-level", "warning"])
 
158
        self.config.write()
 
159
        data = open(self.config_filename).read()
 
160
        self.assertEquals(data.strip(), "[client]\nlog_level = warning")
 
161
 
 
162
    def test_write_manually_set_precedence(self):
 
163
        """Manually set options take precedence over command line when writing.
 
164
        """
 
165
        self.write_config_file(log_level="debug")
 
166
        self.config.load(["--log-level", "warning"])
 
167
        self.config.log_level = "error"
 
168
        self.config.write()
 
169
        data = open(self.config_filename).read()
 
170
        self.assertEquals(data.strip(), "[client]\nlog_level = error")
 
171
 
 
172
    def test_write_to_given_config_file(self):
 
173
        filename = self.makeFile()
 
174
        self.config.load(["--log-level", "warning", "--config", filename],
 
175
                         accept_unexistent_config=True)
 
176
        self.config.log_level = "error"
 
177
        self.config.write()
 
178
        data = open(filename).read()
 
179
        self.assertEquals(data.strip(), "[client]\nlog_level = error")
 
180
 
 
181
    def test_bus_option(self):
 
182
        """The bus option must be specified as 'system' or 'session'."""
 
183
        self.assertRaises(SystemExit,
 
184
                          self.config.load,
 
185
                          ["--bus", "foobar"])
 
186
        self.config.load(["--bus", "session"])
 
187
        self.assertEquals(self.config.bus, "session")
 
188
        self.config.load(["--bus", "system"])
 
189
        self.assertEquals(self.config.bus, "system")
 
190
 
 
191
    def test_config_option(self):
 
192
        opts = self.parser.parse_args(["--config", "hello.cfg"])[0]
 
193
        self.assertEquals(opts.config, "hello.cfg")
 
194
 
 
195
    def test_load_config_from_option(self):
 
196
        filename = self.makeFile("[client]\nhello = world\n")
 
197
        self.config.load(["--config", filename])
 
198
        self.assertEquals(self.config.hello, "world")
 
199
 
 
200
    def test_load_typed_option_from_file(self):
 
201
        class MyConfiguration(self.config_class):
 
202
            def make_parser(self):
 
203
                parser = super(MyConfiguration, self).make_parser()
 
204
                parser.add_option("--year", default=1, type="int")
 
205
                return parser
 
206
        filename = self.makeFile("[client]\nyear = 2008\n")
 
207
        config = MyConfiguration()
 
208
        config.load(["--config", filename])
 
209
        self.assertEquals(config.year, 2008)
 
210
 
 
211
    def test_load_typed_option_from_command_line(self):
 
212
        class MyConfiguration(self.config_class):
 
213
            def make_parser(self):
 
214
                parser = super(MyConfiguration, self).make_parser()
 
215
                parser.add_option("--year", default=1, type="int")
 
216
                return parser
 
217
        config = MyConfiguration()
 
218
        config.load(["--year", "2008"])
 
219
        self.assertEquals(config.year, 2008)
 
220
 
 
221
    def test_reload(self):
 
222
        filename = self.makeFile("[client]\nhello = world1\n")
 
223
        self.config.load(["--config", filename])
 
224
        open(filename, "w").write("[client]\nhello = world2\n")
 
225
        self.config.reload()
 
226
        self.assertEquals(self.config.hello, "world2")
 
227
 
 
228
    def test_data_directory_option(self):
 
229
        opts = self.parser.parse_args(["--data-path", "/opt/hoojy/var/run"])[0]
 
230
        self.assertEquals(opts.data_path, "/opt/hoojy/var/run")
 
231
 
 
232
    def test_data_directory_default(self):
 
233
        opts = self.parser.parse_args([])[0]
 
234
        self.assertEquals(opts.data_path, "/var/lib/landscape/client/")
 
235
 
 
236
    def test_log_file_option(self):
 
237
        opts = self.parser.parse_args(["--log-dir",
 
238
                                       "/var/log/my-awesome-log"])[0]
 
239
        self.assertEquals(opts.log_dir, "/var/log/my-awesome-log")
 
240
 
 
241
    def test_log_level_option(self):
 
242
        opts = self.parser.parse_args([])[0]
 
243
        self.assertEquals(opts.log_level, "info")
 
244
        opts = self.parser.parse_args(["--log-level", "debug"])[0]
 
245
        self.assertEquals(opts.log_level, "debug")
 
246
 
 
247
    def test_quiet_option(self):
 
248
        opts = self.parser.parse_args(["--quiet"])[0]
 
249
        self.assertEquals(opts.quiet, True)
 
250
 
 
251
    def test_quiet_default(self):
 
252
        opts = self.parser.parse_args([])[0]
 
253
        self.assertEquals(opts.quiet, False)
 
254
 
 
255
    def test_ignore_sigint_option(self):
 
256
        opts = self.parser.parse_args(["--ignore-sigint"])[0]
 
257
        self.assertEquals(opts.ignore_sigint, True)
 
258
 
 
259
    def test_ignore_sigint_default(self):
 
260
        opts = self.parser.parse_args([])[0]
 
261
        self.assertEquals(opts.ignore_sigint, False)
 
262
 
 
263
 
 
264
class GetVersionedPersistTest(LandscapeTest):
 
265
 
 
266
    def test_upgrade_service(self):
 
267
        class FakeService(object):
 
268
            persist_filename = self.make_path(content="")
 
269
            service_name = "monitor"
 
270
 
 
271
        upgrade_managers = self.mocker.replace(
 
272
            "landscape.upgraders.UPGRADE_MANAGERS", passthrough=False)
 
273
        upgrade_manager = upgrade_managers["monitor"]
 
274
        upgrade_manager.apply(ANY)
 
275
 
 
276
        stash = []
 
277
        self.mocker.call(stash.append)
 
278
        self.mocker.replay()
 
279
 
 
280
        persist = get_versioned_persist(FakeService())
 
281
        self.assertEquals(stash[0], persist)
 
282
 
 
283
 
 
284
class LandscapeServiceTest(LandscapeTest):
 
285
 
 
286
    def test_create_persist(self):
 
287
        class FakeService(LandscapeService):
 
288
            persist_filename = self.make_path(content="")
 
289
            service_name = "monitor"
 
290
        service = FakeService(None)
 
291
        self.assertEquals(service.persist.filename, service.persist_filename)
 
292
 
 
293
    def test_no_persist_without_filename(self):
 
294
        class FakeService(LandscapeService):
 
295
            service_name = "monitor"
 
296
        service = FakeService(None)
 
297
        self.assertFalse(hasattr(service, "persist"))
 
298
 
 
299
    def test_usr1_rotates_logs(self):
 
300
        """
 
301
        SIGUSR1 should cause logs to be reopened.
 
302
        """
 
303
        logging.getLogger().addHandler(logging.FileHandler(self.make_path()))
 
304
        # Store the initial set of handlers
 
305
        original_streams = [handler.stream for handler in
 
306
                            logging.getLogger().handlers if
 
307
                            isinstance(handler, logging.FileHandler)]
 
308
 
 
309
        # Instantiating LandscapeService should register the handler
 
310
        LandscapeService(None)
 
311
        # We'll call it directly
 
312
        handler = signal.getsignal(signal.SIGUSR1)
 
313
        self.assertTrue(handler)
 
314
        handler(None, None)
 
315
        new_streams = [handler.stream for handler in
 
316
                       logging.getLogger().handlers if
 
317
                       isinstance(handler, logging.FileHandler)]
 
318
 
 
319
        for stream in new_streams:
 
320
            self.assertTrue(stream not in original_streams)
 
321
 
 
322
 
 
323
 
 
324
class AssertUnownedBusNameTest(LandscapeIsolatedTest):
 
325
 
 
326
    helpers = [DBusHelper]
 
327
 
 
328
    class BoringService(Object):
 
329
        bus_name = "com.example.BoringService"
 
330
        object_path = "/com/example/BoringService"
 
331
 
 
332
    def test_raises_sysexit_when_owned(self):
 
333
        service = self.BoringService(self.bus)
 
334
        self.assertRaises(SystemExit, assert_unowned_bus_name,
 
335
                          self.bus, self.BoringService.bus_name)
 
336
 
 
337
    def test_do_nothing_when_unowned(self):
 
338
        assert_unowned_bus_name(self.bus, self.BoringService.bus_name)
 
339