~ubuntu-branches/ubuntu/maverick/python3.1/maverick

« back to all changes in this revision

Viewing changes to Lib/test/test_logging.py

  • Committer: Bazaar Package Importer
  • Author(s): Matthias Klose
  • Date: 2009-03-23 00:01:27 UTC
  • Revision ID: james.westby@ubuntu.com-20090323000127-5fstfxju4ufrhthq
Tags: upstream-3.1~a1+20090322
ImportĀ upstreamĀ versionĀ 3.1~a1+20090322

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#!/usr/bin/env python
 
2
#
 
3
# Copyright 2001-2009 by Vinay Sajip. All Rights Reserved.
 
4
#
 
5
# Permission to use, copy, modify, and distribute this software and its
 
6
# documentation for any purpose and without fee is hereby granted,
 
7
# provided that the above copyright notice appear in all copies and that
 
8
# both that copyright notice and this permission notice appear in
 
9
# supporting documentation, and that the name of Vinay Sajip
 
10
# not be used in advertising or publicity pertaining to distribution
 
11
# of the software without specific, written prior permission.
 
12
# VINAY SAJIP DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING
 
13
# ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
 
14
# VINAY SAJIP BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR
 
15
# ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER
 
16
# IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
 
17
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
 
18
 
 
19
"""Test harness for the logging module. Run all tests.
 
20
 
 
21
Copyright (C) 2001-2009 Vinay Sajip. All Rights Reserved.
 
22
"""
 
23
 
 
24
import logging
 
25
import logging.handlers
 
26
import logging.config
 
27
 
 
28
import codecs
 
29
import copy
 
30
import pickle
 
31
import io
 
32
import gc
 
33
import os
 
34
import re
 
35
import select
 
36
import socket
 
37
from socketserver import ThreadingTCPServer, StreamRequestHandler
 
38
import string
 
39
import struct
 
40
import sys
 
41
import tempfile
 
42
from test.support import captured_stdout, run_with_locale, run_unittest
 
43
import textwrap
 
44
import threading
 
45
import time
 
46
import types
 
47
import unittest
 
48
import warnings
 
49
import weakref
 
50
 
 
51
 
 
52
class BaseTest(unittest.TestCase):
 
53
 
 
54
    """Base class for logging tests."""
 
55
 
 
56
    log_format = "%(name)s -> %(levelname)s: %(message)s"
 
57
    expected_log_pat = r"^([\w.]+) -> ([\w]+): ([\d]+)$"
 
58
    message_num = 0
 
59
 
 
60
    def setUp(self):
 
61
        """Setup the default logging stream to an internal StringIO instance,
 
62
        so that we can examine log output as we want."""
 
63
        logger_dict = logging.getLogger().manager.loggerDict
 
64
        logging._acquireLock()
 
65
        try:
 
66
            self.saved_handlers = logging._handlers.copy()
 
67
            self.saved_handler_list = logging._handlerList[:]
 
68
            self.saved_loggers = logger_dict.copy()
 
69
            self.saved_level_names = logging._levelNames.copy()
 
70
        finally:
 
71
            logging._releaseLock()
 
72
 
 
73
        self.root_logger = logging.getLogger("")
 
74
        self.original_logging_level = self.root_logger.getEffectiveLevel()
 
75
 
 
76
        self.stream = io.StringIO()
 
77
        self.root_logger.setLevel(logging.DEBUG)
 
78
        self.root_hdlr = logging.StreamHandler(self.stream)
 
79
        self.root_formatter = logging.Formatter(self.log_format)
 
80
        self.root_hdlr.setFormatter(self.root_formatter)
 
81
        self.root_logger.addHandler(self.root_hdlr)
 
82
 
 
83
    def tearDown(self):
 
84
        """Remove our logging stream, and restore the original logging
 
85
        level."""
 
86
        self.stream.close()
 
87
        self.root_logger.removeHandler(self.root_hdlr)
 
88
        self.root_logger.setLevel(self.original_logging_level)
 
89
        logging._acquireLock()
 
90
        try:
 
91
            logging._levelNames.clear()
 
92
            logging._levelNames.update(self.saved_level_names)
 
93
            logging._handlers.clear()
 
94
            logging._handlers.update(self.saved_handlers)
 
95
            logging._handlerList[:] = self.saved_handler_list
 
96
            loggerDict = logging.getLogger().manager.loggerDict
 
97
            loggerDict.clear()
 
98
            loggerDict.update(self.saved_loggers)
 
99
        finally:
 
100
            logging._releaseLock()
 
101
 
 
102
    def assert_log_lines(self, expected_values, stream=None):
 
103
        """Match the collected log lines against the regular expression
 
104
        self.expected_log_pat, and compare the extracted group values to
 
105
        the expected_values list of tuples."""
 
106
        stream = stream or self.stream
 
107
        pat = re.compile(self.expected_log_pat)
 
108
        try:
 
109
            stream.reset()
 
110
            actual_lines = stream.readlines()
 
111
        except AttributeError:
 
112
            # StringIO.StringIO lacks a reset() method.
 
113
            actual_lines = stream.getvalue().splitlines()
 
114
        self.assertEquals(len(actual_lines), len(expected_values))
 
115
        for actual, expected in zip(actual_lines, expected_values):
 
116
            match = pat.search(actual)
 
117
            if not match:
 
118
                self.fail("Log line does not match expected pattern:\n" +
 
119
                            actual)
 
120
            self.assertEquals(tuple(match.groups()), expected)
 
121
        s = stream.read()
 
122
        if s:
 
123
            self.fail("Remaining output at end of log stream:\n" + s)
 
124
 
 
125
    def next_message(self):
 
126
        """Generate a message consisting solely of an auto-incrementing
 
127
        integer."""
 
128
        self.message_num += 1
 
129
        return "%d" % self.message_num
 
130
 
 
131
 
 
132
class BuiltinLevelsTest(BaseTest):
 
133
    """Test builtin levels and their inheritance."""
 
134
 
 
135
    def test_flat(self):
 
136
        #Logging levels in a flat logger namespace.
 
137
        m = self.next_message
 
138
 
 
139
        ERR = logging.getLogger("ERR")
 
140
        ERR.setLevel(logging.ERROR)
 
141
        INF = logging.getLogger("INF")
 
142
        INF.setLevel(logging.INFO)
 
143
        DEB = logging.getLogger("DEB")
 
144
        DEB.setLevel(logging.DEBUG)
 
145
 
 
146
        # These should log.
 
147
        ERR.log(logging.CRITICAL, m())
 
148
        ERR.error(m())
 
149
 
 
150
        INF.log(logging.CRITICAL, m())
 
151
        INF.error(m())
 
152
        INF.warn(m())
 
153
        INF.info(m())
 
154
 
 
155
        DEB.log(logging.CRITICAL, m())
 
156
        DEB.error(m())
 
157
        DEB.warn (m())
 
158
        DEB.info (m())
 
159
        DEB.debug(m())
 
160
 
 
161
        # These should not log.
 
162
        ERR.warn(m())
 
163
        ERR.info(m())
 
164
        ERR.debug(m())
 
165
 
 
166
        INF.debug(m())
 
167
 
 
168
        self.assert_log_lines([
 
169
            ('ERR', 'CRITICAL', '1'),
 
170
            ('ERR', 'ERROR', '2'),
 
171
            ('INF', 'CRITICAL', '3'),
 
172
            ('INF', 'ERROR', '4'),
 
173
            ('INF', 'WARNING', '5'),
 
174
            ('INF', 'INFO', '6'),
 
175
            ('DEB', 'CRITICAL', '7'),
 
176
            ('DEB', 'ERROR', '8'),
 
177
            ('DEB', 'WARNING', '9'),
 
178
            ('DEB', 'INFO', '10'),
 
179
            ('DEB', 'DEBUG', '11'),
 
180
        ])
 
181
 
 
182
    def test_nested_explicit(self):
 
183
        # Logging levels in a nested namespace, all explicitly set.
 
184
        m = self.next_message
 
185
 
 
186
        INF = logging.getLogger("INF")
 
187
        INF.setLevel(logging.INFO)
 
188
        INF_ERR  = logging.getLogger("INF.ERR")
 
189
        INF_ERR.setLevel(logging.ERROR)
 
190
 
 
191
        # These should log.
 
192
        INF_ERR.log(logging.CRITICAL, m())
 
193
        INF_ERR.error(m())
 
194
 
 
195
        # These should not log.
 
196
        INF_ERR.warn(m())
 
197
        INF_ERR.info(m())
 
198
        INF_ERR.debug(m())
 
199
 
 
200
        self.assert_log_lines([
 
201
            ('INF.ERR', 'CRITICAL', '1'),
 
202
            ('INF.ERR', 'ERROR', '2'),
 
203
        ])
 
204
 
 
205
    def test_nested_inherited(self):
 
206
        #Logging levels in a nested namespace, inherited from parent loggers.
 
207
        m = self.next_message
 
208
 
 
209
        INF = logging.getLogger("INF")
 
210
        INF.setLevel(logging.INFO)
 
211
        INF_ERR  = logging.getLogger("INF.ERR")
 
212
        INF_ERR.setLevel(logging.ERROR)
 
213
        INF_UNDEF = logging.getLogger("INF.UNDEF")
 
214
        INF_ERR_UNDEF = logging.getLogger("INF.ERR.UNDEF")
 
215
        UNDEF = logging.getLogger("UNDEF")
 
216
 
 
217
        # These should log.
 
218
        INF_UNDEF.log(logging.CRITICAL, m())
 
219
        INF_UNDEF.error(m())
 
220
        INF_UNDEF.warn(m())
 
221
        INF_UNDEF.info(m())
 
222
        INF_ERR_UNDEF.log(logging.CRITICAL, m())
 
223
        INF_ERR_UNDEF.error(m())
 
224
 
 
225
        # These should not log.
 
226
        INF_UNDEF.debug(m())
 
227
        INF_ERR_UNDEF.warn(m())
 
228
        INF_ERR_UNDEF.info(m())
 
229
        INF_ERR_UNDEF.debug(m())
 
230
 
 
231
        self.assert_log_lines([
 
232
            ('INF.UNDEF', 'CRITICAL', '1'),
 
233
            ('INF.UNDEF', 'ERROR', '2'),
 
234
            ('INF.UNDEF', 'WARNING', '3'),
 
235
            ('INF.UNDEF', 'INFO', '4'),
 
236
            ('INF.ERR.UNDEF', 'CRITICAL', '5'),
 
237
            ('INF.ERR.UNDEF', 'ERROR', '6'),
 
238
        ])
 
239
 
 
240
    def test_nested_with_virtual_parent(self):
 
241
        # Logging levels when some parent does not exist yet.
 
242
        m = self.next_message
 
243
 
 
244
        INF = logging.getLogger("INF")
 
245
        GRANDCHILD = logging.getLogger("INF.BADPARENT.UNDEF")
 
246
        CHILD = logging.getLogger("INF.BADPARENT")
 
247
        INF.setLevel(logging.INFO)
 
248
 
 
249
        # These should log.
 
250
        GRANDCHILD.log(logging.FATAL, m())
 
251
        GRANDCHILD.info(m())
 
252
        CHILD.log(logging.FATAL, m())
 
253
        CHILD.info(m())
 
254
 
 
255
        # These should not log.
 
256
        GRANDCHILD.debug(m())
 
257
        CHILD.debug(m())
 
258
 
 
259
        self.assert_log_lines([
 
260
            ('INF.BADPARENT.UNDEF', 'CRITICAL', '1'),
 
261
            ('INF.BADPARENT.UNDEF', 'INFO', '2'),
 
262
            ('INF.BADPARENT', 'CRITICAL', '3'),
 
263
            ('INF.BADPARENT', 'INFO', '4'),
 
264
        ])
 
265
 
 
266
 
 
267
class BasicFilterTest(BaseTest):
 
268
 
 
269
    """Test the bundled Filter class."""
 
270
 
 
271
    def test_filter(self):
 
272
        # Only messages satisfying the specified criteria pass through the
 
273
        #  filter.
 
274
        filter_ = logging.Filter("spam.eggs")
 
275
        handler = self.root_logger.handlers[0]
 
276
        try:
 
277
            handler.addFilter(filter_)
 
278
            spam = logging.getLogger("spam")
 
279
            spam_eggs = logging.getLogger("spam.eggs")
 
280
            spam_eggs_fish = logging.getLogger("spam.eggs.fish")
 
281
            spam_bakedbeans = logging.getLogger("spam.bakedbeans")
 
282
 
 
283
            spam.info(self.next_message())
 
284
            spam_eggs.info(self.next_message())  # Good.
 
285
            spam_eggs_fish.info(self.next_message())  # Good.
 
286
            spam_bakedbeans.info(self.next_message())
 
287
 
 
288
            self.assert_log_lines([
 
289
                ('spam.eggs', 'INFO', '2'),
 
290
                ('spam.eggs.fish', 'INFO', '3'),
 
291
            ])
 
292
        finally:
 
293
            handler.removeFilter(filter_)
 
294
 
 
295
 
 
296
#
 
297
#   First, we define our levels. There can be as many as you want - the only
 
298
#     limitations are that they should be integers, the lowest should be > 0 and
 
299
#   larger values mean less information being logged. If you need specific
 
300
#   level values which do not fit into these limitations, you can use a
 
301
#   mapping dictionary to convert between your application levels and the
 
302
#   logging system.
 
303
#
 
304
SILENT      = 120
 
305
TACITURN    = 119
 
306
TERSE       = 118
 
307
EFFUSIVE    = 117
 
308
SOCIABLE    = 116
 
309
VERBOSE     = 115
 
310
TALKATIVE   = 114
 
311
GARRULOUS   = 113
 
312
CHATTERBOX  = 112
 
313
BORING      = 111
 
314
 
 
315
LEVEL_RANGE = range(BORING, SILENT + 1)
 
316
 
 
317
#
 
318
#   Next, we define names for our levels. You don't need to do this - in which
 
319
#   case the system will use "Level n" to denote the text for the level.
 
320
#
 
321
my_logging_levels = {
 
322
    SILENT      : 'Silent',
 
323
    TACITURN    : 'Taciturn',
 
324
    TERSE       : 'Terse',
 
325
    EFFUSIVE    : 'Effusive',
 
326
    SOCIABLE    : 'Sociable',
 
327
    VERBOSE     : 'Verbose',
 
328
    TALKATIVE   : 'Talkative',
 
329
    GARRULOUS   : 'Garrulous',
 
330
    CHATTERBOX  : 'Chatterbox',
 
331
    BORING      : 'Boring',
 
332
}
 
333
 
 
334
class GarrulousFilter(logging.Filter):
 
335
 
 
336
    """A filter which blocks garrulous messages."""
 
337
 
 
338
    def filter(self, record):
 
339
        return record.levelno != GARRULOUS
 
340
 
 
341
class VerySpecificFilter(logging.Filter):
 
342
 
 
343
    """A filter which blocks sociable and taciturn messages."""
 
344
 
 
345
    def filter(self, record):
 
346
        return record.levelno not in [SOCIABLE, TACITURN]
 
347
 
 
348
 
 
349
class CustomLevelsAndFiltersTest(BaseTest):
 
350
 
 
351
    """Test various filtering possibilities with custom logging levels."""
 
352
 
 
353
    # Skip the logger name group.
 
354
    expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$"
 
355
 
 
356
    def setUp(self):
 
357
        BaseTest.setUp(self)
 
358
        for k, v in list(my_logging_levels.items()):
 
359
            logging.addLevelName(k, v)
 
360
 
 
361
    def log_at_all_levels(self, logger):
 
362
        for lvl in LEVEL_RANGE:
 
363
            logger.log(lvl, self.next_message())
 
364
 
 
365
    def test_logger_filter(self):
 
366
        # Filter at logger level.
 
367
        self.root_logger.setLevel(VERBOSE)
 
368
        # Levels >= 'Verbose' are good.
 
369
        self.log_at_all_levels(self.root_logger)
 
370
        self.assert_log_lines([
 
371
            ('Verbose', '5'),
 
372
            ('Sociable', '6'),
 
373
            ('Effusive', '7'),
 
374
            ('Terse', '8'),
 
375
            ('Taciturn', '9'),
 
376
            ('Silent', '10'),
 
377
        ])
 
378
 
 
379
    def test_handler_filter(self):
 
380
        # Filter at handler level.
 
381
        self.root_logger.handlers[0].setLevel(SOCIABLE)
 
382
        try:
 
383
            # Levels >= 'Sociable' are good.
 
384
            self.log_at_all_levels(self.root_logger)
 
385
            self.assert_log_lines([
 
386
                ('Sociable', '6'),
 
387
                ('Effusive', '7'),
 
388
                ('Terse', '8'),
 
389
                ('Taciturn', '9'),
 
390
                ('Silent', '10'),
 
391
            ])
 
392
        finally:
 
393
            self.root_logger.handlers[0].setLevel(logging.NOTSET)
 
394
 
 
395
    def test_specific_filters(self):
 
396
        # Set a specific filter object on the handler, and then add another
 
397
        #  filter object on the logger itself.
 
398
        handler = self.root_logger.handlers[0]
 
399
        specific_filter = None
 
400
        garr = GarrulousFilter()
 
401
        handler.addFilter(garr)
 
402
        try:
 
403
            self.log_at_all_levels(self.root_logger)
 
404
            first_lines = [
 
405
                # Notice how 'Garrulous' is missing
 
406
                ('Boring', '1'),
 
407
                ('Chatterbox', '2'),
 
408
                ('Talkative', '4'),
 
409
                ('Verbose', '5'),
 
410
                ('Sociable', '6'),
 
411
                ('Effusive', '7'),
 
412
                ('Terse', '8'),
 
413
                ('Taciturn', '9'),
 
414
                ('Silent', '10'),
 
415
            ]
 
416
            self.assert_log_lines(first_lines)
 
417
 
 
418
            specific_filter = VerySpecificFilter()
 
419
            self.root_logger.addFilter(specific_filter)
 
420
            self.log_at_all_levels(self.root_logger)
 
421
            self.assert_log_lines(first_lines + [
 
422
                # Not only 'Garrulous' is still missing, but also 'Sociable'
 
423
                # and 'Taciturn'
 
424
                ('Boring', '11'),
 
425
                ('Chatterbox', '12'),
 
426
                ('Talkative', '14'),
 
427
                ('Verbose', '15'),
 
428
                ('Effusive', '17'),
 
429
                ('Terse', '18'),
 
430
                ('Silent', '20'),
 
431
        ])
 
432
        finally:
 
433
            if specific_filter:
 
434
                self.root_logger.removeFilter(specific_filter)
 
435
            handler.removeFilter(garr)
 
436
 
 
437
 
 
438
class MemoryHandlerTest(BaseTest):
 
439
 
 
440
    """Tests for the MemoryHandler."""
 
441
 
 
442
    # Do not bother with a logger name group.
 
443
    expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$"
 
444
 
 
445
    def setUp(self):
 
446
        BaseTest.setUp(self)
 
447
        self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING,
 
448
                                                        self.root_hdlr)
 
449
        self.mem_logger = logging.getLogger('mem')
 
450
        self.mem_logger.propagate = 0
 
451
        self.mem_logger.addHandler(self.mem_hdlr)
 
452
 
 
453
    def tearDown(self):
 
454
        self.mem_hdlr.close()
 
455
        BaseTest.tearDown(self)
 
456
 
 
457
    def test_flush(self):
 
458
        # The memory handler flushes to its target handler based on specific
 
459
        #  criteria (message count and message level).
 
460
        self.mem_logger.debug(self.next_message())
 
461
        self.assert_log_lines([])
 
462
        self.mem_logger.info(self.next_message())
 
463
        self.assert_log_lines([])
 
464
        # This will flush because the level is >= logging.WARNING
 
465
        self.mem_logger.warn(self.next_message())
 
466
        lines = [
 
467
            ('DEBUG', '1'),
 
468
            ('INFO', '2'),
 
469
            ('WARNING', '3'),
 
470
        ]
 
471
        self.assert_log_lines(lines)
 
472
        for n in (4, 14):
 
473
            for i in range(9):
 
474
                self.mem_logger.debug(self.next_message())
 
475
            self.assert_log_lines(lines)
 
476
            # This will flush because it's the 10th message since the last
 
477
            #  flush.
 
478
            self.mem_logger.debug(self.next_message())
 
479
            lines = lines + [('DEBUG', str(i)) for i in range(n, n + 10)]
 
480
            self.assert_log_lines(lines)
 
481
 
 
482
        self.mem_logger.debug(self.next_message())
 
483
        self.assert_log_lines(lines)
 
484
 
 
485
 
 
486
class ExceptionFormatter(logging.Formatter):
 
487
    """A special exception formatter."""
 
488
    def formatException(self, ei):
 
489
        return "Got a [%s]" % ei[0].__name__
 
490
 
 
491
 
 
492
class ConfigFileTest(BaseTest):
 
493
 
 
494
    """Reading logging config from a .ini-style config file."""
 
495
 
 
496
    expected_log_pat = r"^([\w]+) \+\+ ([\w]+)$"
 
497
 
 
498
    # config0 is a standard configuration.
 
499
    config0 = """
 
500
    [loggers]
 
501
    keys=root
 
502
 
 
503
    [handlers]
 
504
    keys=hand1
 
505
 
 
506
    [formatters]
 
507
    keys=form1
 
508
 
 
509
    [logger_root]
 
510
    level=WARNING
 
511
    handlers=hand1
 
512
 
 
513
    [handler_hand1]
 
514
    class=StreamHandler
 
515
    level=NOTSET
 
516
    formatter=form1
 
517
    args=(sys.stdout,)
 
518
 
 
519
    [formatter_form1]
 
520
    format=%(levelname)s ++ %(message)s
 
521
    datefmt=
 
522
    """
 
523
 
 
524
    # config1 adds a little to the standard configuration.
 
525
    config1 = """
 
526
    [loggers]
 
527
    keys=root,parser
 
528
 
 
529
    [handlers]
 
530
    keys=hand1
 
531
 
 
532
    [formatters]
 
533
    keys=form1
 
534
 
 
535
    [logger_root]
 
536
    level=WARNING
 
537
    handlers=
 
538
 
 
539
    [logger_parser]
 
540
    level=DEBUG
 
541
    handlers=hand1
 
542
    propagate=1
 
543
    qualname=compiler.parser
 
544
 
 
545
    [handler_hand1]
 
546
    class=StreamHandler
 
547
    level=NOTSET
 
548
    formatter=form1
 
549
    args=(sys.stdout,)
 
550
 
 
551
    [formatter_form1]
 
552
    format=%(levelname)s ++ %(message)s
 
553
    datefmt=
 
554
    """
 
555
 
 
556
    # config2 has a subtle configuration error that should be reported
 
557
    config2 = config1.replace("sys.stdout", "sys.stbout")
 
558
 
 
559
    # config3 has a less subtle configuration error
 
560
    config3 = config1.replace("formatter=form1", "formatter=misspelled_name")
 
561
 
 
562
    # config4 specifies a custom formatter class to be loaded
 
563
    config4 = """
 
564
    [loggers]
 
565
    keys=root
 
566
 
 
567
    [handlers]
 
568
    keys=hand1
 
569
 
 
570
    [formatters]
 
571
    keys=form1
 
572
 
 
573
    [logger_root]
 
574
    level=NOTSET
 
575
    handlers=hand1
 
576
 
 
577
    [handler_hand1]
 
578
    class=StreamHandler
 
579
    level=NOTSET
 
580
    formatter=form1
 
581
    args=(sys.stdout,)
 
582
 
 
583
    [formatter_form1]
 
584
    class=""" + __name__ + """.ExceptionFormatter
 
585
    format=%(levelname)s:%(name)s:%(message)s
 
586
    datefmt=
 
587
    """
 
588
 
 
589
    # config5 specifies a custom handler class to be loaded
 
590
    config5 = config1.replace('class=StreamHandler', 'class=logging.StreamHandler')
 
591
 
 
592
    # config6 uses ', ' delimiters in the handlers and formatters sections
 
593
    config6 = """
 
594
    [loggers]
 
595
    keys=root,parser
 
596
 
 
597
    [handlers]
 
598
    keys=hand1, hand2
 
599
 
 
600
    [formatters]
 
601
    keys=form1, form2
 
602
 
 
603
    [logger_root]
 
604
    level=WARNING
 
605
    handlers=
 
606
 
 
607
    [logger_parser]
 
608
    level=DEBUG
 
609
    handlers=hand1
 
610
    propagate=1
 
611
    qualname=compiler.parser
 
612
 
 
613
    [handler_hand1]
 
614
    class=StreamHandler
 
615
    level=NOTSET
 
616
    formatter=form1
 
617
    args=(sys.stdout,)
 
618
 
 
619
    [handler_hand2]
 
620
    class=StreamHandler
 
621
    level=NOTSET
 
622
    formatter=form1
 
623
    args=(sys.stderr,)
 
624
 
 
625
    [formatter_form1]
 
626
    format=%(levelname)s ++ %(message)s
 
627
    datefmt=
 
628
 
 
629
    [formatter_form2]
 
630
    format=%(message)s
 
631
    datefmt=
 
632
    """
 
633
 
 
634
    def apply_config(self, conf):
 
635
        try:
 
636
            fn = tempfile.mktemp(".ini")
 
637
            f = open(fn, "w")
 
638
            f.write(textwrap.dedent(conf))
 
639
            f.close()
 
640
            logging.config.fileConfig(fn)
 
641
        finally:
 
642
            os.remove(fn)
 
643
 
 
644
    def test_config0_ok(self):
 
645
        # A simple config file which overrides the default settings.
 
646
        with captured_stdout() as output:
 
647
            self.apply_config(self.config0)
 
648
            logger = logging.getLogger()
 
649
            # Won't output anything
 
650
            logger.info(self.next_message())
 
651
            # Outputs a message
 
652
            logger.error(self.next_message())
 
653
            self.assert_log_lines([
 
654
                ('ERROR', '2'),
 
655
            ], stream=output)
 
656
            # Original logger output is empty.
 
657
            self.assert_log_lines([])
 
658
 
 
659
    def test_config1_ok(self, config=config1):
 
660
        # A config file defining a sub-parser as well.
 
661
        with captured_stdout() as output:
 
662
            self.apply_config(config)
 
663
            logger = logging.getLogger("compiler.parser")
 
664
            # Both will output a message
 
665
            logger.info(self.next_message())
 
666
            logger.error(self.next_message())
 
667
            self.assert_log_lines([
 
668
                ('INFO', '1'),
 
669
                ('ERROR', '2'),
 
670
            ], stream=output)
 
671
            # Original logger output is empty.
 
672
            self.assert_log_lines([])
 
673
 
 
674
    def test_config2_failure(self):
 
675
        # A simple config file which overrides the default settings.
 
676
        self.assertRaises(Exception, self.apply_config, self.config2)
 
677
 
 
678
    def test_config3_failure(self):
 
679
        # A simple config file which overrides the default settings.
 
680
        self.assertRaises(Exception, self.apply_config, self.config3)
 
681
 
 
682
    def test_config4_ok(self):
 
683
        # A config file specifying a custom formatter class.
 
684
        with captured_stdout() as output:
 
685
            self.apply_config(self.config4)
 
686
            logger = logging.getLogger()
 
687
            try:
 
688
                raise RuntimeError()
 
689
            except RuntimeError:
 
690
                logging.exception("just testing")
 
691
            sys.stdout.seek(0)
 
692
            self.assertEquals(output.getvalue(),
 
693
                "ERROR:root:just testing\nGot a [RuntimeError]\n")
 
694
            # Original logger output is empty
 
695
            self.assert_log_lines([])
 
696
 
 
697
    def test_config5_ok(self):
 
698
        self.test_config1_ok(config=self.config5)
 
699
 
 
700
    def test_config6_ok(self):
 
701
        self.test_config1_ok(config=self.config6)
 
702
 
 
703
class LogRecordStreamHandler(StreamRequestHandler):
 
704
 
 
705
    """Handler for a streaming logging request. It saves the log message in the
 
706
    TCP server's 'log_output' attribute."""
 
707
 
 
708
    TCP_LOG_END = "!!!END!!!"
 
709
 
 
710
    def handle(self):
 
711
        """Handle multiple requests - each expected to be of 4-byte length,
 
712
        followed by the LogRecord in pickle format. Logs the record
 
713
        according to whatever policy is configured locally."""
 
714
        while True:
 
715
            chunk = self.connection.recv(4)
 
716
            if len(chunk) < 4:
 
717
                break
 
718
            slen = struct.unpack(">L", chunk)[0]
 
719
            chunk = self.connection.recv(slen)
 
720
            while len(chunk) < slen:
 
721
                chunk = chunk + self.connection.recv(slen - len(chunk))
 
722
            obj = self.unpickle(chunk)
 
723
            record = logging.makeLogRecord(obj)
 
724
            self.handle_log_record(record)
 
725
 
 
726
    def unpickle(self, data):
 
727
        return pickle.loads(data)
 
728
 
 
729
    def handle_log_record(self, record):
 
730
        # If the end-of-messages sentinel is seen, tell the server to
 
731
        #  terminate.
 
732
        if self.TCP_LOG_END in record.msg:
 
733
            self.server.abort = 1
 
734
            return
 
735
        self.server.log_output += record.msg + "\n"
 
736
 
 
737
 
 
738
class LogRecordSocketReceiver(ThreadingTCPServer):
 
739
 
 
740
    """A simple-minded TCP socket-based logging receiver suitable for test
 
741
    purposes."""
 
742
 
 
743
    allow_reuse_address = 1
 
744
    log_output = ""
 
745
 
 
746
    def __init__(self, host='localhost',
 
747
                             port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
 
748
                     handler=LogRecordStreamHandler):
 
749
        ThreadingTCPServer.__init__(self, (host, port), handler)
 
750
        self.abort = False
 
751
        self.timeout = 0.1
 
752
        self.finished = threading.Event()
 
753
 
 
754
    def serve_until_stopped(self):
 
755
        while not self.abort:
 
756
            rd, wr, ex = select.select([self.socket.fileno()], [], [],
 
757
                                       self.timeout)
 
758
            if rd:
 
759
                self.handle_request()
 
760
        # Notify the main thread that we're about to exit
 
761
        self.finished.set()
 
762
        # close the listen socket
 
763
        self.server_close()
 
764
 
 
765
 
 
766
class SocketHandlerTest(BaseTest):
 
767
 
 
768
    """Test for SocketHandler objects."""
 
769
 
 
770
    def setUp(self):
 
771
        """Set up a TCP server to receive log messages, and a SocketHandler
 
772
        pointing to that server's address and port."""
 
773
        BaseTest.setUp(self)
 
774
        self.tcpserver = LogRecordSocketReceiver(port=0)
 
775
        self.port = self.tcpserver.socket.getsockname()[1]
 
776
        self.threads = [
 
777
                threading.Thread(target=self.tcpserver.serve_until_stopped)]
 
778
        for thread in self.threads:
 
779
            thread.start()
 
780
 
 
781
        self.sock_hdlr = logging.handlers.SocketHandler('localhost', self.port)
 
782
        self.sock_hdlr.setFormatter(self.root_formatter)
 
783
        self.root_logger.removeHandler(self.root_logger.handlers[0])
 
784
        self.root_logger.addHandler(self.sock_hdlr)
 
785
 
 
786
    def tearDown(self):
 
787
        """Shutdown the TCP server."""
 
788
        try:
 
789
            self.tcpserver.abort = True
 
790
            del self.tcpserver
 
791
            self.root_logger.removeHandler(self.sock_hdlr)
 
792
            self.sock_hdlr.close()
 
793
            for thread in self.threads:
 
794
                thread.join(2.0)
 
795
        finally:
 
796
            BaseTest.tearDown(self)
 
797
 
 
798
    def get_output(self):
 
799
        """Get the log output as received by the TCP server."""
 
800
        # Signal the TCP receiver and wait for it to terminate.
 
801
        self.root_logger.critical(LogRecordStreamHandler.TCP_LOG_END)
 
802
        self.tcpserver.finished.wait(2.0)
 
803
        return self.tcpserver.log_output
 
804
 
 
805
    def test_output(self):
 
806
        # The log message sent to the SocketHandler is properly received.
 
807
        logger = logging.getLogger("tcp")
 
808
        logger.error("spam")
 
809
        logger.debug("eggs")
 
810
        self.assertEquals(self.get_output(), "spam\neggs\n")
 
811
 
 
812
 
 
813
class MemoryTest(BaseTest):
 
814
 
 
815
    """Test memory persistence of logger objects."""
 
816
 
 
817
    def setUp(self):
 
818
        """Create a dict to remember potentially destroyed objects."""
 
819
        BaseTest.setUp(self)
 
820
        self._survivors = {}
 
821
 
 
822
    def _watch_for_survival(self, *args):
 
823
        """Watch the given objects for survival, by creating weakrefs to
 
824
        them."""
 
825
        for obj in args:
 
826
            key = id(obj), repr(obj)
 
827
            self._survivors[key] = weakref.ref(obj)
 
828
 
 
829
    def _assert_survival(self):
 
830
        """Assert that all objects watched for survival have survived."""
 
831
        # Trigger cycle breaking.
 
832
        gc.collect()
 
833
        dead = []
 
834
        for (id_, repr_), ref in list(self._survivors.items()):
 
835
            if ref() is None:
 
836
                dead.append(repr_)
 
837
        if dead:
 
838
            self.fail("%d objects should have survived "
 
839
                "but have been destroyed: %s" % (len(dead), ", ".join(dead)))
 
840
 
 
841
    def test_persistent_loggers(self):
 
842
        # Logger objects are persistent and retain their configuration, even
 
843
        #  if visible references are destroyed.
 
844
        self.root_logger.setLevel(logging.INFO)
 
845
        foo = logging.getLogger("foo")
 
846
        self._watch_for_survival(foo)
 
847
        foo.setLevel(logging.DEBUG)
 
848
        self.root_logger.debug(self.next_message())
 
849
        foo.debug(self.next_message())
 
850
        self.assert_log_lines([
 
851
            ('foo', 'DEBUG', '2'),
 
852
        ])
 
853
        del foo
 
854
        # foo has survived.
 
855
        self._assert_survival()
 
856
        # foo has retained its settings.
 
857
        bar = logging.getLogger("foo")
 
858
        bar.debug(self.next_message())
 
859
        self.assert_log_lines([
 
860
            ('foo', 'DEBUG', '2'),
 
861
            ('foo', 'DEBUG', '3'),
 
862
        ])
 
863
 
 
864
 
 
865
class EncodingTest(BaseTest):
 
866
    def test_encoding_plain_file(self):
 
867
        # In Python 2.x, a plain file object is treated as having no encoding.
 
868
        log = logging.getLogger("test")
 
869
        fn = tempfile.mktemp(".log")
 
870
        # the non-ascii data we write to the log.
 
871
        data = "foo\x80"
 
872
        try:
 
873
            handler = logging.FileHandler(fn, encoding="utf8")
 
874
            log.addHandler(handler)
 
875
            try:
 
876
                # write non-ascii data to the log.
 
877
                log.warning(data)
 
878
            finally:
 
879
                log.removeHandler(handler)
 
880
                handler.close()
 
881
            # check we wrote exactly those bytes, ignoring trailing \n etc
 
882
            f = open(fn, encoding="utf8")
 
883
            try:
 
884
                self.failUnlessEqual(f.read().rstrip(), data)
 
885
            finally:
 
886
                f.close()
 
887
        finally:
 
888
            if os.path.isfile(fn):
 
889
                os.remove(fn)
 
890
 
 
891
    def test_encoding_cyrillic_unicode(self):
 
892
        log = logging.getLogger("test")
 
893
        #Get a message in Unicode: Do svidanya in Cyrillic (meaning goodbye)
 
894
        message = '\u0434\u043e \u0441\u0432\u0438\u0434\u0430\u043d\u0438\u044f'
 
895
        #Ensure it's written in a Cyrillic encoding
 
896
        writer_class = codecs.getwriter('cp1251')
 
897
        stream = io.BytesIO()
 
898
        writer = writer_class(stream, 'strict')
 
899
        handler = logging.StreamHandler(writer)
 
900
        log.addHandler(handler)
 
901
        try:
 
902
            log.warning(message)
 
903
        finally:
 
904
            log.removeHandler(handler)
 
905
            handler.close()
 
906
        # check we wrote exactly those bytes, ignoring trailing \n etc
 
907
        s = stream.getvalue()
 
908
        #Compare against what the data should be when encoded in CP-1251
 
909
        self.assertEqual(s, b'\xe4\xee \xf1\xe2\xe8\xe4\xe0\xed\xe8\xff\n')
 
910
 
 
911
 
 
912
class WarningsTest(BaseTest):
 
913
    def test_warnings(self):
 
914
        logging.captureWarnings(True)
 
915
        warnings.filterwarnings("always", category=UserWarning)
 
916
        try:
 
917
            file = io.StringIO()
 
918
            h = logging.StreamHandler(file)
 
919
            logger = logging.getLogger("py.warnings")
 
920
            logger.addHandler(h)
 
921
            warnings.warn("I'm warning you...")
 
922
            logger.removeHandler(h)
 
923
            s = file.getvalue()
 
924
            h.close()
 
925
            self.assertTrue(s.find("UserWarning: I'm warning you...\n") > 0)
 
926
 
 
927
            #See if an explicit file uses the original implementation
 
928
            file = io.StringIO()
 
929
            warnings.showwarning("Explicit", UserWarning, "dummy.py", 42, file,
 
930
                                 "Dummy line")
 
931
            s = file.getvalue()
 
932
            file.close()
 
933
            self.assertEqual(s, "dummy.py:42: UserWarning: Explicit\n  Dummy line\n")
 
934
        finally:
 
935
            warnings.resetwarnings()
 
936
            logging.captureWarnings(False)
 
937
 
 
938
# Set the locale to the platform-dependent default.  I have no idea
 
939
# why the test does this, but in any case we save the current locale
 
940
# first and restore it at the end.
 
941
@run_with_locale('LC_ALL', '')
 
942
def test_main():
 
943
    run_unittest(BuiltinLevelsTest, BasicFilterTest,
 
944
                    CustomLevelsAndFiltersTest, MemoryHandlerTest,
 
945
                    ConfigFileTest, SocketHandlerTest, MemoryTest,
 
946
                    EncodingTest, WarningsTest)
 
947
 
 
948
if __name__ == "__main__":
 
949
    test_main()