1
# Copyright (c) 2001-2010 Twisted Matrix Laboratories.
2
# See LICENSE for details.
5
Tests for L{twisted.python.log}.
8
import os, sys, time, logging, warnings
9
from cStringIO import StringIO
11
from twisted.trial import unittest
13
from twisted.python import log, failure
16
class FakeWarning(Warning):
18
A unique L{Warning} subclass used by tests for interactions of
19
L{twisted.python.log} with the L{warnings} module.
24
class LogTest(unittest.TestCase):
28
self.observer = self.catcher.append
29
log.addObserver(self.observer)
30
self.addCleanup(log.removeObserver, self.observer)
33
def testObservation(self):
34
catcher = self.catcher
35
log.msg("test", testShouldCatch=True)
37
self.assertEquals(i["message"][0], "test")
38
self.assertEquals(i["testShouldCatch"], True)
39
self.failUnless(i.has_key("time"))
40
self.assertEquals(len(catcher), 0)
43
def testContext(self):
44
catcher = self.catcher
45
log.callWithContext({"subsystem": "not the default",
49
{"subsubsystem": "b"}, log.msg, "foo", other="d")
51
self.assertEquals(i['subsubsystem'], 'b')
52
self.assertEquals(i['subsystem'], 'not the default')
53
self.assertEquals(i['other'], 'd')
54
self.assertEquals(i['message'][0], 'foo')
57
for e, ig in [("hello world","hello world"),
58
(KeyError(), KeyError),
59
(failure.Failure(RuntimeError()), RuntimeError)]:
61
i = self.catcher.pop()
62
self.assertEquals(i['isError'], 1)
63
self.flushLoggedErrors(ig)
65
def testErrorsWithWhy(self):
66
for e, ig in [("hello world","hello world"),
67
(KeyError(), KeyError),
68
(failure.Failure(RuntimeError()), RuntimeError)]:
70
i = self.catcher.pop()
71
self.assertEquals(i['isError'], 1)
72
self.assertEquals(i['why'], 'foobar')
73
self.flushLoggedErrors(ig)
76
def test_erroneousErrors(self):
78
Exceptions raised by log observers are logged but the observer which
79
raised the exception remains registered with the publisher. These
80
exceptions do not prevent the event from being sent to other observers
81
registered with the publisher.
88
for observer in [L1.append, broken, L2.append]:
89
log.addObserver(observer)
90
self.addCleanup(log.removeObserver, observer)
93
# Reset the lists for simpler comparison.
97
# Send out the event which will break one of the observers.
98
log.msg("Howdy, y'all.")
100
# The broken observer should have caused this to be logged. There
101
# is a slight bug with LogPublisher - when it logs an error from an
102
# observer, it uses the global "err", which is not necessarily
103
# associated with it, but which may be associated with a different
104
# LogPublisher! See #3307.
105
excs = self.flushLoggedErrors(ZeroDivisionError)
106
self.assertEqual(len(excs), 1)
108
# Both other observers should have seen the message.
109
self.assertEquals(len(L1), 2)
110
self.assertEquals(len(L2), 2)
112
# The order is slightly wrong here. The first event should be
113
# delivered to all observers; then, errors should be delivered.
114
self.assertEquals(L1[1]['message'], ("Howdy, y'all.",))
115
self.assertEquals(L2[0]['message'], ("Howdy, y'all.",))
118
def test_showwarning(self):
120
L{twisted.python.log.showwarning} emits the warning as a message
121
to the Twisted logging system.
123
publisher = log.LogPublisher()
124
publisher.addObserver(self.observer)
126
publisher.showwarning(
127
FakeWarning("unique warning message"), FakeWarning,
128
"warning-filename.py", 27)
129
event = self.catcher.pop()
131
event['format'] % event,
132
'warning-filename.py:27: twisted.test.test_log.FakeWarning: '
133
'unique warning message')
134
self.assertEqual(self.catcher, [])
136
# Python 2.6 requires that any function used to override the
137
# warnings.showwarning API accept a "line" parameter or a
138
# deprecation warning is emitted.
139
publisher.showwarning(
140
FakeWarning("unique warning message"), FakeWarning,
141
"warning-filename.py", 27, line=object())
142
event = self.catcher.pop()
144
event['format'] % event,
145
'warning-filename.py:27: twisted.test.test_log.FakeWarning: '
146
'unique warning message')
147
self.assertEqual(self.catcher, [])
150
def test_warningToFile(self):
152
L{twisted.python.log.showwarning} passes warnings with an explicit file
153
target on to the underlying Python warning system.
155
message = "another unique message"
156
category = FakeWarning
157
filename = "warning-filename.py"
161
log.showwarning(message, category, filename, lineno, file=output)
165
warnings.formatwarning(message, category, filename, lineno))
167
# In Python 2.6, warnings.showwarning accepts a "line" argument which
168
# gives the source line the warning message is to include.
169
if sys.version_info >= (2, 6):
172
log.showwarning(message, category, filename, lineno, file=output,
177
warnings.formatwarning(message, category, filename, lineno,
181
class FakeFile(list):
182
def write(self, bytes):
194
return "Happy Evil Repr"
198
class EvilReprStr(EvilStr, EvilRepr):
201
class LogPublisherTestCaseMixin:
204
Add a log observer which records log events in C{self.out}. Also,
205
make sure the default string encoding is ASCII so that
206
L{testSingleUnicode} can test the behavior of logging unencodable
209
self.out = FakeFile()
210
self.lp = log.LogPublisher()
211
self.flo = log.FileLogObserver(self.out)
212
self.lp.addObserver(self.flo.emit)
215
str(u'\N{VULGAR FRACTION ONE HALF}')
216
except UnicodeEncodeError:
217
# This is the behavior we want - don't change anything.
218
self._origEncoding = None
221
self._origEncoding = sys.getdefaultencoding()
222
sys.setdefaultencoding('ascii')
227
Verify that everything written to the fake file C{self.out} was a
228
C{str}. Also, restore the default string encoding to its previous
229
setting, if it was modified by L{setUp}.
231
for chunk in self.out:
232
self.failUnless(isinstance(chunk, str), "%r was not a string" % (chunk,))
234
if self._origEncoding is not None:
235
sys.setdefaultencoding(self._origEncoding)
236
del sys.setdefaultencoding
240
class LogPublisherTestCase(LogPublisherTestCaseMixin, unittest.TestCase):
241
def testSingleString(self):
242
self.lp.msg("Hello, world.")
243
self.assertEquals(len(self.out), 1)
246
def testMultipleString(self):
247
# Test some stupid behavior that will be deprecated real soon.
248
# If you are reading this and trying to learn how the logging
249
# system works, *do not use this feature*.
250
self.lp.msg("Hello, ", "world.")
251
self.assertEquals(len(self.out), 1)
254
def testSingleUnicode(self):
255
self.lp.msg(u"Hello, \N{VULGAR FRACTION ONE HALF} world.")
256
self.assertEquals(len(self.out), 1)
257
self.assertIn('with str error', self.out[0])
258
self.assertIn('UnicodeEncodeError', self.out[0])
262
class FileObserverTestCase(LogPublisherTestCaseMixin, unittest.TestCase):
263
def test_getTimezoneOffset(self):
265
Attempt to verify that L{FileLogObserver.getTimezoneOffset} returns
266
correct values for the current C{TZ} environment setting. Do this
267
by setting C{TZ} to various well-known values and asserting that the
268
reported offset is correct.
270
localDaylightTuple = (2006, 6, 30, 0, 0, 0, 4, 181, 1)
271
utcDaylightTimestamp = time.mktime(localDaylightTuple)
272
localStandardTuple = (2007, 1, 31, 0, 0, 0, 2, 31, 0)
273
utcStandardTimestamp = time.mktime(localStandardTuple)
275
originalTimezone = os.environ.get('TZ', None)
277
# Test something west of UTC
278
os.environ['TZ'] = 'America/New_York'
281
self.flo.getTimezoneOffset(utcDaylightTimestamp),
284
self.flo.getTimezoneOffset(utcStandardTimestamp),
287
# Test something east of UTC
288
os.environ['TZ'] = 'Europe/Berlin'
291
self.flo.getTimezoneOffset(utcDaylightTimestamp),
294
self.flo.getTimezoneOffset(utcStandardTimestamp),
297
# Test a timezone that doesn't have DST
298
os.environ['TZ'] = 'Africa/Johannesburg'
301
self.flo.getTimezoneOffset(utcDaylightTimestamp),
304
self.flo.getTimezoneOffset(utcStandardTimestamp),
307
if originalTimezone is None:
310
os.environ['TZ'] = originalTimezone
312
if getattr(time, 'tzset', None) is None:
313
test_getTimezoneOffset.skip = (
314
"Platform cannot change timezone, cannot verify correct offsets "
315
"in well-known timezones.")
318
def test_timeFormatting(self):
320
Test the method of L{FileLogObserver} which turns a timestamp into a
321
human-readable string.
323
# There is no function in the time module which converts a UTC time
324
# tuple to a timestamp.
325
when = time.mktime((2001, 2, 3, 4, 5, 6, 7, 8, 0)) - time.timezone
327
# Pretend to be in US/Eastern for a moment
328
self.flo.getTimezoneOffset = lambda when: 18000
329
self.assertEquals(self.flo.formatTime(when), '2001-02-02 23:05:06-0500')
331
# Okay now we're in Eastern Europe somewhere
332
self.flo.getTimezoneOffset = lambda when: -3600
333
self.assertEquals(self.flo.formatTime(when), '2001-02-03 05:05:06+0100')
335
# And off in the Pacific or someplace like that
336
self.flo.getTimezoneOffset = lambda when: -39600
337
self.assertEquals(self.flo.formatTime(when), '2001-02-03 15:05:06+1100')
339
# One of those weird places with a half-hour offset timezone
340
self.flo.getTimezoneOffset = lambda when: 5400
341
self.assertEquals(self.flo.formatTime(when), '2001-02-03 02:35:06-0130')
343
# Half-hour offset in the other direction
344
self.flo.getTimezoneOffset = lambda when: -5400
345
self.assertEquals(self.flo.formatTime(when), '2001-02-03 05:35:06+0130')
347
# Test an offset which is between 0 and 60 minutes to make sure the
348
# sign comes out properly in that case.
349
self.flo.getTimezoneOffset = lambda when: 1800
350
self.assertEquals(self.flo.formatTime(when), '2001-02-03 03:35:06-0030')
352
# Test an offset between 0 and 60 minutes in the other direction.
353
self.flo.getTimezoneOffset = lambda when: -1800
354
self.assertEquals(self.flo.formatTime(when), '2001-02-03 04:35:06+0030')
356
# If a strftime-format string is present on the logger, it should
357
# use that instead. Note we don't assert anything about day, hour
358
# or minute because we cannot easily control what time.strftime()
359
# thinks the local timezone is.
360
self.flo.timeFormat = '%Y %m'
361
self.assertEquals(self.flo.formatTime(when), '2001 02')
364
def test_loggingAnObjectWithBroken__str__(self):
366
self.lp.msg(EvilStr())
367
self.assertEquals(len(self.out), 1)
368
# Logging system shouldn't need to crap itself for this trivial case
369
self.assertNotIn('UNFORMATTABLE', self.out[0])
372
def test_formattingAnObjectWithBroken__str__(self):
373
self.lp.msg(format='%(blat)s', blat=EvilStr())
374
self.assertEquals(len(self.out), 1)
375
self.assertIn('Invalid format string or unformattable object', self.out[0])
378
def test_brokenSystem__str__(self):
379
self.lp.msg('huh', system=EvilStr())
380
self.assertEquals(len(self.out), 1)
381
self.assertIn('Invalid format string or unformattable object', self.out[0])
384
def test_formattingAnObjectWithBroken__repr__Indirect(self):
385
self.lp.msg(format='%(blat)s', blat=[EvilRepr()])
386
self.assertEquals(len(self.out), 1)
387
self.assertIn('UNFORMATTABLE OBJECT', self.out[0])
390
def test_systemWithBroker__repr__Indirect(self):
391
self.lp.msg('huh', system=[EvilRepr()])
392
self.assertEquals(len(self.out), 1)
393
self.assertIn('UNFORMATTABLE OBJECT', self.out[0])
396
def test_simpleBrokenFormat(self):
397
self.lp.msg(format='hooj %s %s', blat=1)
398
self.assertEquals(len(self.out), 1)
399
self.assertIn('Invalid format string or unformattable object', self.out[0])
402
def test_ridiculousFormat(self):
403
self.lp.msg(format=42, blat=1)
404
self.assertEquals(len(self.out), 1)
405
self.assertIn('Invalid format string or unformattable object', self.out[0])
408
def test_evilFormat__repr__And__str__(self):
409
self.lp.msg(format=EvilReprStr(), blat=1)
410
self.assertEquals(len(self.out), 1)
411
self.assertIn('PATHOLOGICAL', self.out[0])
414
def test_strangeEventDict(self):
416
This kind of eventDict used to fail silently, so test it does.
418
self.lp.msg(message='', isError=False)
419
self.assertEquals(len(self.out), 0)
422
def test_startLoggingTwice(self):
424
There are some obscure error conditions that can occur when logging is
425
started twice. See http://twistedmatrix.com/trac/ticket/3289 for more
428
# The bug is particular to the way that the t.p.log 'global' function
429
# handle stdout. If we use our own stream, the error doesn't occur. If
430
# we use our own LogPublisher, the error doesn't occur.
431
sys.stdout = StringIO()
432
self.addCleanup(setattr, sys, 'stdout', sys.__stdout__)
434
def showError(eventDict):
435
if eventDict['isError']:
436
sys.__stdout__.write(eventDict['failure'].getTraceback())
438
log.addObserver(showError)
439
self.addCleanup(log.removeObserver, showError)
440
observer = log.startLogging(sys.stdout)
441
self.addCleanup(observer.stop)
442
# At this point, we expect that sys.stdout is a StdioOnnaStick object.
443
self.assertIsInstance(sys.stdout, log.StdioOnnaStick)
444
fakeStdout = sys.stdout
445
observer = log.startLogging(sys.stdout)
446
self.assertIdentical(sys.stdout, fakeStdout)
449
class PythonLoggingObserverTestCase(unittest.TestCase):
451
Test the bridge with python logging module.
454
self.out = StringIO()
456
rootLogger = logging.getLogger("")
457
self.originalLevel = rootLogger.getEffectiveLevel()
458
rootLogger.setLevel(logging.DEBUG)
459
self.hdlr = logging.StreamHandler(self.out)
460
fmt = logging.Formatter(logging.BASIC_FORMAT)
461
self.hdlr.setFormatter(fmt)
462
rootLogger.addHandler(self.hdlr)
464
self.lp = log.LogPublisher()
465
self.obs = log.PythonLoggingObserver()
466
self.lp.addObserver(self.obs.emit)
469
rootLogger = logging.getLogger("")
470
rootLogger.removeHandler(self.hdlr)
471
rootLogger.setLevel(self.originalLevel)
474
def test_singleString(self):
476
Test simple output, and default log level.
478
self.lp.msg("Hello, world.")
479
self.assertIn("Hello, world.", self.out.getvalue())
480
self.assertIn("INFO", self.out.getvalue())
482
def test_errorString(self):
486
self.lp.msg(failure=failure.Failure(ValueError("That is bad.")), isError=True)
487
self.assertIn("ERROR", self.out.getvalue())
489
def test_formatString(self):
491
Test logging with a format.
493
self.lp.msg(format="%(bar)s oo %(foo)s", bar="Hello", foo="world")
494
self.assertIn("Hello oo world", self.out.getvalue())
496
def test_customLevel(self):
498
Test the logLevel keyword for customizing level used.
500
self.lp.msg("Spam egg.", logLevel=logging.DEBUG)
501
self.assertIn("Spam egg.", self.out.getvalue())
502
self.assertIn("DEBUG", self.out.getvalue())
504
self.lp.msg("Foo bar.", logLevel=logging.WARNING)
505
self.assertIn("Foo bar.", self.out.getvalue())
506
self.assertIn("WARNING", self.out.getvalue())
508
def test_strangeEventDict(self):
510
Verify that an event dictionary which is not an error and has an empty
511
message isn't recorded.
513
self.lp.msg(message='', isError=False)
514
self.assertEquals(self.out.getvalue(), '')
517
class PythonLoggingIntegrationTestCase(unittest.TestCase):
519
Test integration of python logging bridge.
521
def test_startStopObserver(self):
523
Test that start and stop methods of the observer actually register
524
and unregister to the log system.
526
oldAddObserver = log.addObserver
527
oldRemoveObserver = log.removeObserver
530
log.addObserver = l.append
531
log.removeObserver = l.remove
532
obs = log.PythonLoggingObserver()
534
self.assertEquals(l[0], obs.emit)
536
self.assertEquals(len(l), 0)
538
log.addObserver = oldAddObserver
539
log.removeObserver = oldRemoveObserver
541
def test_inheritance(self):
543
Test that we can inherit L{log.PythonLoggingObserver} and use super:
544
that's basically a validation that L{log.PythonLoggingObserver} is
547
class MyObserver(log.PythonLoggingObserver):
548
def emit(self, eventDict):
549
super(MyObserver, self).emit(eventDict)
552
oldEmit = log.PythonLoggingObserver.emit
554
log.PythonLoggingObserver.emit = l.append
556
self.assertEquals(len(l), 1)
558
log.PythonLoggingObserver.emit = oldEmit