50
46
{key: Equals(value) for key, value in expected.items()})
53
def setLegacyObservers(observers):
54
"""Remove existing legacy log observers, add those given."""
55
for observer in log.theLogPublisher.observers:
56
log.theLogPublisher.removeObserver(observer)
57
for observer in observers:
58
log.theLogPublisher.addObserver(observer)
61
class TestLegacyFileLogObserver(MAASTestCase):
62
"""Scenario tests for `LegacyFileLogObserver`."""
65
(log_level.name, dict(log_level=log_level))
66
for log_level in logger.LogLevel.iterconstants()
69
def test__namespace_and_level_is_printed_in_legacy_log(self):
70
# Restore existing observers at the end. This must be careful with
71
# ordering of clean-ups, hence the use of unittest.mock.patch.object
72
# as a context manager.
73
self.addCleanup(setLegacyObservers, log.theLogPublisher.observers)
74
# The global non-legacy `LogBeginner` emits critical messages straight
75
# to stderr, so temporarily put aside its observer to avoid seeing the
76
# critical log messages we're going to generate.
77
self.patch(logger.globalLogPublisher, "_observers", [])
79
logbuffer = io.StringIO()
80
observer = LegacyFileLogObserver(logbuffer)
81
observer.formatTime = lambda when: "<timestamp>"
84
# Deliberately use the default global observer in the new logger
85
# because we want to see how it behaves in a typical environment where
86
# logs are being emitted by the legacy logging infrastructure, for
87
# example running under `twistd`.
88
newlog = partial(logger.Logger().emit, self.log_level)
91
log, "LegacyLogObserverWrapper",
92
log.LegacyLogObserverWrapper):
93
setLegacyObservers([observer.emit])
94
oldlog("Message from legacy", system="legacy")
95
newlog("Message from modern", log_system="modern")
98
logbuffer.getvalue(), DocTestMatches("""\
99
<timestamp> legacy: [info] Message from legacy
100
<timestamp> modern: [%s] Message from modern
101
""" % self.log_level.name))
104
class TestLegacyFileLogObserver_Other(MAASTestCase):
105
"""Other tests for `LegacyFileLogObserver`."""
107
def test__namespace_is_not_emitted_via_logfile_logerr(self):
108
# Restore existing observers at the end. This must be careful with
109
# ordering of clean-ups, hence the use of unittest.mock.patch.object
110
# as a context manager.
111
self.addCleanup(setLegacyObservers, log.theLogPublisher.observers)
112
# The global non-legacy `LogBeginner` emits critical messages straight
113
# to stderr, so temporarily put aside its observer to avoid seeing the
114
# critical log messages we're going to generate.
115
self.patch(logger.globalLogPublisher, "_observers", [])
117
logbuffer = io.StringIO()
118
observer = LegacyFileLogObserver(logbuffer)
119
observer.formatTime = lambda when: "<timestamp>"
122
log, "LegacyLogObserverWrapper",
123
log.LegacyLogObserverWrapper):
124
setLegacyObservers([observer.emit])
125
log.logfile.write("Via log.logfile\n")
126
log.logerr.write("Via log.logerr\n")
129
logbuffer.getvalue(), DocTestMatches("""\
130
<timestamp> -: [info] Via log.logfile
131
<timestamp> -: [error] Via log.logerr
135
class TestLegacyLogObserverWrapper(MAASTestCase):
136
"""Scenario tests for `LegacyLogObserverWrapper`."""
139
(log_level.name, dict(log_level=log_level))
140
for log_level in logger.LogLevel.iterconstants()
143
def processEvent(self, event):
145
observer = LegacyLogObserverWrapper(events.append)
147
self.assertThat(events, HasLength(1))
150
def test__adds_system_to_event(self):
152
# This is a `twisted.logger` event, not legacy, and requires
153
# values for `log_time` and `log_level` at a minimum.
155
"log_time": sentinel.log_time,
156
"log_level": self.log_level,
159
Not(Contains("log_system")),
160
ContainsDictByEquality({"system": "-"}),
164
def test__adds_log_system_and_system_to_event_with_namespace(self):
165
log_namespace = factory.make_name("log_namespace")
168
"log_time": sentinel.log_time,
169
"log_level": self.log_level,
170
"log_namespace": log_namespace,
172
ContainsDictByEquality({
173
"log_system": log_namespace,
174
"system": log_namespace,
178
def test__adds_system_to_legacy_event(self):
180
# This is a `twisted.python.log` event, i.e. legacy, and requires
181
# values for `time` and `isError` at a minimum.
183
"time": sentinel.time,
184
"isError": factory.pick_bool(),
187
Not(Contains("log_system")),
188
ContainsDictByEquality({"system": "-"}),
192
def test__preserves_log_system_in_event(self):
193
log_system = factory.make_name("log_system")
196
"log_time": sentinel.time,
197
"log_level": self.log_level,
198
"log_system": log_system,
200
# `log_system` is not modified; `system` is set to match.
201
ContainsDictByEquality({
202
"log_system": log_system,
203
"system": log_system,
207
def test__preserves_system_in_legacy_event(self):
208
system = factory.make_name("system")
211
"time": sentinel.time,
212
"isError": factory.pick_bool(),
216
# `log_system` is not added when `system` already exists.
217
Not(Contains("log_system")),
218
ContainsDictByEquality({
225
class TestLegacyLogObserverWrapper_Installation(MAASTestCase):
226
"""Tests for `LegacyLogObserverWrapper`."""
230
# Restore existing observers at the end. Tests must be careful with
231
# ordering of clean-ups, hence the use of unittest.mock.patch.object
232
# as a context manager in the tests themselves.
233
self.addCleanup(setLegacyObservers, log.theLogPublisher.observers)
235
def test__installs_wrapper_to_log_module(self):
236
with patch.object(log, "LegacyLogObserverWrapper", sentinel.unchanged):
238
log.LegacyLogObserverWrapper,
239
Is(sentinel.unchanged))
240
LegacyLogObserverWrapper.install()
242
log.LegacyLogObserverWrapper,
243
Is(LegacyLogObserverWrapper))
245
def test__rewraps_existing_observers(self):
249
def __init__(self, observer):
250
self.legacyObserver = observer
252
def __call__(self, event):
253
return self.legacyObserver(event)
255
with patch.object(log, "LegacyLogObserverWrapper", OldWrapper):
257
observers = (lambda event: event), (lambda event: event)
258
setLegacyObservers(observers)
260
# Our legacy observers are all registered.
262
log.theLogPublisher.observers,
263
MatchesSetwise(*map(Is, observers)))
264
# Behind the scenes they're all wrapped with OldWrapper.
266
log.theLogPublisher._legacyObservers,
267
AllMatch(IsInstance(OldWrapper)))
268
# They're registered with the new global log publisher too.
270
logger.globalLogPublisher._observers,
271
ContainsAll(log.theLogPublisher._legacyObservers))
274
LegacyLogObserverWrapper.install()
276
# Our legacy observers are all still registered.
278
log.theLogPublisher.observers,
279
MatchesSetwise(*map(Is, observers)))
280
# Behind the scenes they're now all wrapped with our wrapper.
282
log.theLogPublisher._legacyObservers,
283
AllMatch(IsInstance(LegacyLogObserverWrapper)))
284
# They're registered with the new global log publisher too.
286
logger.globalLogPublisher._observers,
287
ContainsAll(log.theLogPublisher._legacyObservers))
290
49
def formatTimeStatic(when):
291
50
"""Just return <when>."""
460
219
with TwistedLoggerFixture() as logger:
461
220
observe_twisted_internet_unix(event)
462
221
self.assertThat(logger.events, Contains(event))
224
class TestGetSystemName(MAASTestCase):
225
"""Tests for `_getSystemName`."""
228
"foo.bar.baz": "foo.bar.baz",
229
"f_o.bar.baz": "f_o.bar.baz",
230
"foo.b_r.baz": "foo.b_r.baz",
231
"foo.bar.b_z": "foo.bar.b_z",
232
"foo.bar._az": "foo.bar",
233
"foo._ar.baz": "foo",
234
"foo._ar._az": "foo",
243
string_in or repr(string_in),
244
string_out or repr(string_out)),
245
{"string_in": string_in, "string_out": string_out})
246
for string_in, string_out in expectations.items()
251
_getSystemName(self.string_in),
252
Equals(self.string_out))
255
class TestFormatModernEvent(MAASTestCase):
256
"""Tests for `_formatModernEvent`."""
259
(level.name, {"log_level": level})
260
for level in logger.LogLevel.iterconstants()
263
def test_format_basics(self):
264
thing1 = factory.make_name("thing")
265
thing2 = factory.make_name("thing")
266
log_system = factory.make_name("system")
267
log_format = ">{thing1}< >{thing2}<"
268
log_time = pick_log_time()
271
"log_time": log_time,
272
"log_format": log_format,
273
"log_system": log_system,
274
"log_level": self.log_level,
279
"%s %s: [%s] >%s< >%s<\n" % (
280
logger.formatTime(log_time, DEFAULT_LOG_FORMAT_DATE),
281
log_system, self.log_level.name, thing1, thing2),
285
def test_formats_without_format(self):
288
"log_level": self.log_level,
290
Equals("- -: [%s] \n" % self.log_level.name),
293
def test_formats_with_null_format(self):
297
"log_level": self.log_level,
299
Equals("- -: [%s] \n" % self.log_level.name),
302
def test_formats_without_time(self):
305
"log_level": self.log_level,
307
Equals("- -: [%s] \n" % self.log_level.name),
310
def test_formats_with_null_time(self):
314
"log_level": self.log_level,
316
Equals("- -: [%s] \n" % self.log_level.name),
319
def test_uses_namespace_if_system_missing(self):
320
log_namespace = factory.make_name("namespace")
323
"log_level": self.log_level,
324
"log_namespace": log_namespace,
328
log_namespace, self.log_level.name),
332
def test_uses_namespace_if_system_null(self):
333
log_namespace = factory.make_name("namespace")
336
"log_level": self.log_level,
337
"log_namespace": log_namespace,
342
log_namespace, self.log_level.name),
347
class TestEventLogger(MAASTestCase):
348
"""Tests for `EventLogger`."""
351
(level.name, {"log_level": level})
352
for level in logger.LogLevel.iterconstants()
356
super(TestEventLogger, self).setUp()
357
self.output = io.StringIO()
358
self.log = EventLogger(self.output)
359
self.get_logs = lambda: find_log_lines(self.output.getvalue())
361
def setLogLevel(self, log_level):
362
"""Set the level at which events will be logged.
364
This is not a minimum level, it is an absolute level.
366
self.patch(_twisted, "_filterByLevels", {self.log_level})
368
def test_basics(self):
369
self.setLogLevel(self.log_level)
370
event = make_event(log_level=self.log_level)
371
event["log_system"] = factory.make_name("system")
373
self.assertSequenceEqual(
374
[(event["log_system"], self.log_level.name, event["log_text"])],
378
def test_filters_by_level(self):
379
self.setLogLevel(self.log_level)
381
log_level: make_event(log_level=log_level)
382
for log_level in logger.LogLevel.iterconstants()
384
for event in events.values():
386
# Only the log at the current level will get through.
387
self.assertSequenceEqual(
388
[("-", self.log_level.name, events[self.log_level]["log_text"])],
392
def test_filters_by_noise(self):
393
self.setLogLevel(self.log_level)
394
common = dict(log_namespace="log_legacy", log_system="-")
396
make_event("Log opened.", **common),
397
make_event("Main loop terminated.", **common),
402
make_event(log_level=self.log_level, **common),
406
# Only the `okay` logs will get through.
408
("-", self.log_level.name, event["log_text"])
411
self.assertSequenceEqual(expected, self.get_logs())