3
# Copyright 2001-2009 by Vinay Sajip. All Rights Reserved.
5
# Permission to use, copy, modify, and distribute this software and its
6
# documentation for any purpose and without fee is hereby granted,
7
# provided that the above copyright notice appear in all copies and that
8
# both that copyright notice and this permission notice appear in
9
# supporting documentation, and that the name of Vinay Sajip
10
# not be used in advertising or publicity pertaining to distribution
11
# of the software without specific, written prior permission.
12
# VINAY SAJIP DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING
13
# ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
14
# VINAY SAJIP BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR
15
# ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER
16
# IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
17
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
19
"""Test harness for the logging module. Run all tests.
21
Copyright (C) 2001-2009 Vinay Sajip. All Rights Reserved.
25
import logging.handlers
37
from socketserver import ThreadingTCPServer, StreamRequestHandler
42
from test.support import captured_stdout, run_with_locale, run_unittest
52
class BaseTest(unittest.TestCase):
54
"""Base class for logging tests."""
56
log_format = "%(name)s -> %(levelname)s: %(message)s"
57
expected_log_pat = r"^([\w.]+) -> ([\w]+): ([\d]+)$"
61
"""Setup the default logging stream to an internal StringIO instance,
62
so that we can examine log output as we want."""
63
logger_dict = logging.getLogger().manager.loggerDict
64
logging._acquireLock()
66
self.saved_handlers = logging._handlers.copy()
67
self.saved_handler_list = logging._handlerList[:]
68
self.saved_loggers = logger_dict.copy()
69
self.saved_level_names = logging._levelNames.copy()
71
logging._releaseLock()
73
self.root_logger = logging.getLogger("")
74
self.original_logging_level = self.root_logger.getEffectiveLevel()
76
self.stream = io.StringIO()
77
self.root_logger.setLevel(logging.DEBUG)
78
self.root_hdlr = logging.StreamHandler(self.stream)
79
self.root_formatter = logging.Formatter(self.log_format)
80
self.root_hdlr.setFormatter(self.root_formatter)
81
self.root_logger.addHandler(self.root_hdlr)
84
"""Remove our logging stream, and restore the original logging
87
self.root_logger.removeHandler(self.root_hdlr)
88
self.root_logger.setLevel(self.original_logging_level)
89
logging._acquireLock()
91
logging._levelNames.clear()
92
logging._levelNames.update(self.saved_level_names)
93
logging._handlers.clear()
94
logging._handlers.update(self.saved_handlers)
95
logging._handlerList[:] = self.saved_handler_list
96
loggerDict = logging.getLogger().manager.loggerDict
98
loggerDict.update(self.saved_loggers)
100
logging._releaseLock()
102
def assert_log_lines(self, expected_values, stream=None):
103
"""Match the collected log lines against the regular expression
104
self.expected_log_pat, and compare the extracted group values to
105
the expected_values list of tuples."""
106
stream = stream or self.stream
107
pat = re.compile(self.expected_log_pat)
110
actual_lines = stream.readlines()
111
except AttributeError:
112
# StringIO.StringIO lacks a reset() method.
113
actual_lines = stream.getvalue().splitlines()
114
self.assertEquals(len(actual_lines), len(expected_values))
115
for actual, expected in zip(actual_lines, expected_values):
116
match = pat.search(actual)
118
self.fail("Log line does not match expected pattern:\n" +
120
self.assertEquals(tuple(match.groups()), expected)
123
self.fail("Remaining output at end of log stream:\n" + s)
125
def next_message(self):
126
"""Generate a message consisting solely of an auto-incrementing
128
self.message_num += 1
129
return "%d" % self.message_num
132
class BuiltinLevelsTest(BaseTest):
133
"""Test builtin levels and their inheritance."""
136
#Logging levels in a flat logger namespace.
137
m = self.next_message
139
ERR = logging.getLogger("ERR")
140
ERR.setLevel(logging.ERROR)
141
INF = logging.getLogger("INF")
142
INF.setLevel(logging.INFO)
143
DEB = logging.getLogger("DEB")
144
DEB.setLevel(logging.DEBUG)
147
ERR.log(logging.CRITICAL, m())
150
INF.log(logging.CRITICAL, m())
155
DEB.log(logging.CRITICAL, m())
161
# These should not log.
168
self.assert_log_lines([
169
('ERR', 'CRITICAL', '1'),
170
('ERR', 'ERROR', '2'),
171
('INF', 'CRITICAL', '3'),
172
('INF', 'ERROR', '4'),
173
('INF', 'WARNING', '5'),
174
('INF', 'INFO', '6'),
175
('DEB', 'CRITICAL', '7'),
176
('DEB', 'ERROR', '8'),
177
('DEB', 'WARNING', '9'),
178
('DEB', 'INFO', '10'),
179
('DEB', 'DEBUG', '11'),
182
def test_nested_explicit(self):
183
# Logging levels in a nested namespace, all explicitly set.
184
m = self.next_message
186
INF = logging.getLogger("INF")
187
INF.setLevel(logging.INFO)
188
INF_ERR = logging.getLogger("INF.ERR")
189
INF_ERR.setLevel(logging.ERROR)
192
INF_ERR.log(logging.CRITICAL, m())
195
# These should not log.
200
self.assert_log_lines([
201
('INF.ERR', 'CRITICAL', '1'),
202
('INF.ERR', 'ERROR', '2'),
205
def test_nested_inherited(self):
206
#Logging levels in a nested namespace, inherited from parent loggers.
207
m = self.next_message
209
INF = logging.getLogger("INF")
210
INF.setLevel(logging.INFO)
211
INF_ERR = logging.getLogger("INF.ERR")
212
INF_ERR.setLevel(logging.ERROR)
213
INF_UNDEF = logging.getLogger("INF.UNDEF")
214
INF_ERR_UNDEF = logging.getLogger("INF.ERR.UNDEF")
215
UNDEF = logging.getLogger("UNDEF")
218
INF_UNDEF.log(logging.CRITICAL, m())
222
INF_ERR_UNDEF.log(logging.CRITICAL, m())
223
INF_ERR_UNDEF.error(m())
225
# These should not log.
227
INF_ERR_UNDEF.warn(m())
228
INF_ERR_UNDEF.info(m())
229
INF_ERR_UNDEF.debug(m())
231
self.assert_log_lines([
232
('INF.UNDEF', 'CRITICAL', '1'),
233
('INF.UNDEF', 'ERROR', '2'),
234
('INF.UNDEF', 'WARNING', '3'),
235
('INF.UNDEF', 'INFO', '4'),
236
('INF.ERR.UNDEF', 'CRITICAL', '5'),
237
('INF.ERR.UNDEF', 'ERROR', '6'),
240
def test_nested_with_virtual_parent(self):
241
# Logging levels when some parent does not exist yet.
242
m = self.next_message
244
INF = logging.getLogger("INF")
245
GRANDCHILD = logging.getLogger("INF.BADPARENT.UNDEF")
246
CHILD = logging.getLogger("INF.BADPARENT")
247
INF.setLevel(logging.INFO)
250
GRANDCHILD.log(logging.FATAL, m())
252
CHILD.log(logging.FATAL, m())
255
# These should not log.
256
GRANDCHILD.debug(m())
259
self.assert_log_lines([
260
('INF.BADPARENT.UNDEF', 'CRITICAL', '1'),
261
('INF.BADPARENT.UNDEF', 'INFO', '2'),
262
('INF.BADPARENT', 'CRITICAL', '3'),
263
('INF.BADPARENT', 'INFO', '4'),
267
class BasicFilterTest(BaseTest):
269
"""Test the bundled Filter class."""
271
def test_filter(self):
272
# Only messages satisfying the specified criteria pass through the
274
filter_ = logging.Filter("spam.eggs")
275
handler = self.root_logger.handlers[0]
277
handler.addFilter(filter_)
278
spam = logging.getLogger("spam")
279
spam_eggs = logging.getLogger("spam.eggs")
280
spam_eggs_fish = logging.getLogger("spam.eggs.fish")
281
spam_bakedbeans = logging.getLogger("spam.bakedbeans")
283
spam.info(self.next_message())
284
spam_eggs.info(self.next_message()) # Good.
285
spam_eggs_fish.info(self.next_message()) # Good.
286
spam_bakedbeans.info(self.next_message())
288
self.assert_log_lines([
289
('spam.eggs', 'INFO', '2'),
290
('spam.eggs.fish', 'INFO', '3'),
293
handler.removeFilter(filter_)
297
# First, we define our levels. There can be as many as you want - the only
298
# limitations are that they should be integers, the lowest should be > 0 and
299
# larger values mean less information being logged. If you need specific
300
# level values which do not fit into these limitations, you can use a
301
# mapping dictionary to convert between your application levels and the
315
LEVEL_RANGE = range(BORING, SILENT + 1)
318
# Next, we define names for our levels. You don't need to do this - in which
319
# case the system will use "Level n" to denote the text for the level.
321
my_logging_levels = {
323
TACITURN : 'Taciturn',
325
EFFUSIVE : 'Effusive',
326
SOCIABLE : 'Sociable',
328
TALKATIVE : 'Talkative',
329
GARRULOUS : 'Garrulous',
330
CHATTERBOX : 'Chatterbox',
334
class GarrulousFilter(logging.Filter):
336
"""A filter which blocks garrulous messages."""
338
def filter(self, record):
339
return record.levelno != GARRULOUS
341
class VerySpecificFilter(logging.Filter):
343
"""A filter which blocks sociable and taciturn messages."""
345
def filter(self, record):
346
return record.levelno not in [SOCIABLE, TACITURN]
349
class CustomLevelsAndFiltersTest(BaseTest):
351
"""Test various filtering possibilities with custom logging levels."""
353
# Skip the logger name group.
354
expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$"
358
for k, v in list(my_logging_levels.items()):
359
logging.addLevelName(k, v)
361
def log_at_all_levels(self, logger):
362
for lvl in LEVEL_RANGE:
363
logger.log(lvl, self.next_message())
365
def test_logger_filter(self):
366
# Filter at logger level.
367
self.root_logger.setLevel(VERBOSE)
368
# Levels >= 'Verbose' are good.
369
self.log_at_all_levels(self.root_logger)
370
self.assert_log_lines([
379
def test_handler_filter(self):
380
# Filter at handler level.
381
self.root_logger.handlers[0].setLevel(SOCIABLE)
383
# Levels >= 'Sociable' are good.
384
self.log_at_all_levels(self.root_logger)
385
self.assert_log_lines([
393
self.root_logger.handlers[0].setLevel(logging.NOTSET)
395
def test_specific_filters(self):
396
# Set a specific filter object on the handler, and then add another
397
# filter object on the logger itself.
398
handler = self.root_logger.handlers[0]
399
specific_filter = None
400
garr = GarrulousFilter()
401
handler.addFilter(garr)
403
self.log_at_all_levels(self.root_logger)
405
# Notice how 'Garrulous' is missing
416
self.assert_log_lines(first_lines)
418
specific_filter = VerySpecificFilter()
419
self.root_logger.addFilter(specific_filter)
420
self.log_at_all_levels(self.root_logger)
421
self.assert_log_lines(first_lines + [
422
# Not only 'Garrulous' is still missing, but also 'Sociable'
425
('Chatterbox', '12'),
434
self.root_logger.removeFilter(specific_filter)
435
handler.removeFilter(garr)
438
class MemoryHandlerTest(BaseTest):
440
"""Tests for the MemoryHandler."""
442
# Do not bother with a logger name group.
443
expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$"
447
self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING,
449
self.mem_logger = logging.getLogger('mem')
450
self.mem_logger.propagate = 0
451
self.mem_logger.addHandler(self.mem_hdlr)
454
self.mem_hdlr.close()
455
BaseTest.tearDown(self)
457
def test_flush(self):
458
# The memory handler flushes to its target handler based on specific
459
# criteria (message count and message level).
460
self.mem_logger.debug(self.next_message())
461
self.assert_log_lines([])
462
self.mem_logger.info(self.next_message())
463
self.assert_log_lines([])
464
# This will flush because the level is >= logging.WARNING
465
self.mem_logger.warn(self.next_message())
471
self.assert_log_lines(lines)
474
self.mem_logger.debug(self.next_message())
475
self.assert_log_lines(lines)
476
# This will flush because it's the 10th message since the last
478
self.mem_logger.debug(self.next_message())
479
lines = lines + [('DEBUG', str(i)) for i in range(n, n + 10)]
480
self.assert_log_lines(lines)
482
self.mem_logger.debug(self.next_message())
483
self.assert_log_lines(lines)
486
class ExceptionFormatter(logging.Formatter):
487
"""A special exception formatter."""
488
def formatException(self, ei):
489
return "Got a [%s]" % ei[0].__name__
492
class ConfigFileTest(BaseTest):
494
"""Reading logging config from a .ini-style config file."""
496
expected_log_pat = r"^([\w]+) \+\+ ([\w]+)$"
498
# config0 is a standard configuration.
520
format=%(levelname)s ++ %(message)s
524
# config1 adds a little to the standard configuration.
543
qualname=compiler.parser
552
format=%(levelname)s ++ %(message)s
556
# config2 has a subtle configuration error that should be reported
557
config2 = config1.replace("sys.stdout", "sys.stbout")
559
# config3 has a less subtle configuration error
560
config3 = config1.replace("formatter=form1", "formatter=misspelled_name")
562
# config4 specifies a custom formatter class to be loaded
584
class=""" + __name__ + """.ExceptionFormatter
585
format=%(levelname)s:%(name)s:%(message)s
589
# config5 specifies a custom handler class to be loaded
590
config5 = config1.replace('class=StreamHandler', 'class=logging.StreamHandler')
592
# config6 uses ', ' delimiters in the handlers and formatters sections
611
qualname=compiler.parser
626
format=%(levelname)s ++ %(message)s
634
def apply_config(self, conf):
636
fn = tempfile.mktemp(".ini")
638
f.write(textwrap.dedent(conf))
640
logging.config.fileConfig(fn)
644
def test_config0_ok(self):
645
# A simple config file which overrides the default settings.
646
with captured_stdout() as output:
647
self.apply_config(self.config0)
648
logger = logging.getLogger()
649
# Won't output anything
650
logger.info(self.next_message())
652
logger.error(self.next_message())
653
self.assert_log_lines([
656
# Original logger output is empty.
657
self.assert_log_lines([])
659
def test_config1_ok(self, config=config1):
660
# A config file defining a sub-parser as well.
661
with captured_stdout() as output:
662
self.apply_config(config)
663
logger = logging.getLogger("compiler.parser")
664
# Both will output a message
665
logger.info(self.next_message())
666
logger.error(self.next_message())
667
self.assert_log_lines([
671
# Original logger output is empty.
672
self.assert_log_lines([])
674
def test_config2_failure(self):
675
# A simple config file which overrides the default settings.
676
self.assertRaises(Exception, self.apply_config, self.config2)
678
def test_config3_failure(self):
679
# A simple config file which overrides the default settings.
680
self.assertRaises(Exception, self.apply_config, self.config3)
682
def test_config4_ok(self):
683
# A config file specifying a custom formatter class.
684
with captured_stdout() as output:
685
self.apply_config(self.config4)
686
logger = logging.getLogger()
690
logging.exception("just testing")
692
self.assertEquals(output.getvalue(),
693
"ERROR:root:just testing\nGot a [RuntimeError]\n")
694
# Original logger output is empty
695
self.assert_log_lines([])
697
def test_config5_ok(self):
698
self.test_config1_ok(config=self.config5)
700
def test_config6_ok(self):
701
self.test_config1_ok(config=self.config6)
703
class LogRecordStreamHandler(StreamRequestHandler):
705
"""Handler for a streaming logging request. It saves the log message in the
706
TCP server's 'log_output' attribute."""
708
TCP_LOG_END = "!!!END!!!"
711
"""Handle multiple requests - each expected to be of 4-byte length,
712
followed by the LogRecord in pickle format. Logs the record
713
according to whatever policy is configured locally."""
715
chunk = self.connection.recv(4)
718
slen = struct.unpack(">L", chunk)[0]
719
chunk = self.connection.recv(slen)
720
while len(chunk) < slen:
721
chunk = chunk + self.connection.recv(slen - len(chunk))
722
obj = self.unpickle(chunk)
723
record = logging.makeLogRecord(obj)
724
self.handle_log_record(record)
726
def unpickle(self, data):
727
return pickle.loads(data)
729
def handle_log_record(self, record):
730
# If the end-of-messages sentinel is seen, tell the server to
732
if self.TCP_LOG_END in record.msg:
733
self.server.abort = 1
735
self.server.log_output += record.msg + "\n"
738
class LogRecordSocketReceiver(ThreadingTCPServer):
740
"""A simple-minded TCP socket-based logging receiver suitable for test
743
allow_reuse_address = 1
746
def __init__(self, host='localhost',
747
port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
748
handler=LogRecordStreamHandler):
749
ThreadingTCPServer.__init__(self, (host, port), handler)
752
self.finished = threading.Event()
754
def serve_until_stopped(self):
755
while not self.abort:
756
rd, wr, ex = select.select([self.socket.fileno()], [], [],
759
self.handle_request()
760
# Notify the main thread that we're about to exit
762
# close the listen socket
766
class SocketHandlerTest(BaseTest):
768
"""Test for SocketHandler objects."""
771
"""Set up a TCP server to receive log messages, and a SocketHandler
772
pointing to that server's address and port."""
774
self.tcpserver = LogRecordSocketReceiver(port=0)
775
self.port = self.tcpserver.socket.getsockname()[1]
777
threading.Thread(target=self.tcpserver.serve_until_stopped)]
778
for thread in self.threads:
781
self.sock_hdlr = logging.handlers.SocketHandler('localhost', self.port)
782
self.sock_hdlr.setFormatter(self.root_formatter)
783
self.root_logger.removeHandler(self.root_logger.handlers[0])
784
self.root_logger.addHandler(self.sock_hdlr)
787
"""Shutdown the TCP server."""
789
self.tcpserver.abort = True
791
self.root_logger.removeHandler(self.sock_hdlr)
792
self.sock_hdlr.close()
793
for thread in self.threads:
796
BaseTest.tearDown(self)
798
def get_output(self):
799
"""Get the log output as received by the TCP server."""
800
# Signal the TCP receiver and wait for it to terminate.
801
self.root_logger.critical(LogRecordStreamHandler.TCP_LOG_END)
802
self.tcpserver.finished.wait(2.0)
803
return self.tcpserver.log_output
805
def test_output(self):
806
# The log message sent to the SocketHandler is properly received.
807
logger = logging.getLogger("tcp")
810
self.assertEquals(self.get_output(), "spam\neggs\n")
813
class MemoryTest(BaseTest):
815
"""Test memory persistence of logger objects."""
818
"""Create a dict to remember potentially destroyed objects."""
822
def _watch_for_survival(self, *args):
823
"""Watch the given objects for survival, by creating weakrefs to
826
key = id(obj), repr(obj)
827
self._survivors[key] = weakref.ref(obj)
829
def _assert_survival(self):
830
"""Assert that all objects watched for survival have survived."""
831
# Trigger cycle breaking.
834
for (id_, repr_), ref in list(self._survivors.items()):
838
self.fail("%d objects should have survived "
839
"but have been destroyed: %s" % (len(dead), ", ".join(dead)))
841
def test_persistent_loggers(self):
842
# Logger objects are persistent and retain their configuration, even
843
# if visible references are destroyed.
844
self.root_logger.setLevel(logging.INFO)
845
foo = logging.getLogger("foo")
846
self._watch_for_survival(foo)
847
foo.setLevel(logging.DEBUG)
848
self.root_logger.debug(self.next_message())
849
foo.debug(self.next_message())
850
self.assert_log_lines([
851
('foo', 'DEBUG', '2'),
855
self._assert_survival()
856
# foo has retained its settings.
857
bar = logging.getLogger("foo")
858
bar.debug(self.next_message())
859
self.assert_log_lines([
860
('foo', 'DEBUG', '2'),
861
('foo', 'DEBUG', '3'),
865
class EncodingTest(BaseTest):
866
def test_encoding_plain_file(self):
867
# In Python 2.x, a plain file object is treated as having no encoding.
868
log = logging.getLogger("test")
869
fn = tempfile.mktemp(".log")
870
# the non-ascii data we write to the log.
873
handler = logging.FileHandler(fn, encoding="utf8")
874
log.addHandler(handler)
876
# write non-ascii data to the log.
879
log.removeHandler(handler)
881
# check we wrote exactly those bytes, ignoring trailing \n etc
882
f = open(fn, encoding="utf8")
884
self.failUnlessEqual(f.read().rstrip(), data)
888
if os.path.isfile(fn):
891
def test_encoding_cyrillic_unicode(self):
892
log = logging.getLogger("test")
893
#Get a message in Unicode: Do svidanya in Cyrillic (meaning goodbye)
894
message = '\u0434\u043e \u0441\u0432\u0438\u0434\u0430\u043d\u0438\u044f'
895
#Ensure it's written in a Cyrillic encoding
896
writer_class = codecs.getwriter('cp1251')
897
stream = io.BytesIO()
898
writer = writer_class(stream, 'strict')
899
handler = logging.StreamHandler(writer)
900
log.addHandler(handler)
904
log.removeHandler(handler)
906
# check we wrote exactly those bytes, ignoring trailing \n etc
907
s = stream.getvalue()
908
#Compare against what the data should be when encoded in CP-1251
909
self.assertEqual(s, b'\xe4\xee \xf1\xe2\xe8\xe4\xe0\xed\xe8\xff\n')
912
class WarningsTest(BaseTest):
913
def test_warnings(self):
914
logging.captureWarnings(True)
915
warnings.filterwarnings("always", category=UserWarning)
918
h = logging.StreamHandler(file)
919
logger = logging.getLogger("py.warnings")
921
warnings.warn("I'm warning you...")
922
logger.removeHandler(h)
925
self.assertTrue(s.find("UserWarning: I'm warning you...\n") > 0)
927
#See if an explicit file uses the original implementation
929
warnings.showwarning("Explicit", UserWarning, "dummy.py", 42, file,
933
self.assertEqual(s, "dummy.py:42: UserWarning: Explicit\n Dummy line\n")
935
warnings.resetwarnings()
936
logging.captureWarnings(False)
938
# Set the locale to the platform-dependent default. I have no idea
939
# why the test does this, but in any case we save the current locale
940
# first and restore it at the end.
941
@run_with_locale('LC_ALL', '')
943
run_unittest(BuiltinLevelsTest, BasicFilterTest,
944
CustomLevelsAndFiltersTest, MemoryHandlerTest,
945
ConfigFileTest, SocketHandlerTest, MemoryTest,
946
EncodingTest, WarningsTest)
948
if __name__ == "__main__":