~0x44/nova/bug838466

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/test/test_log.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (c) 2001-2010 Twisted Matrix Laboratories.
 
2
# See LICENSE for details.
 
3
 
 
4
"""
 
5
Tests for L{twisted.python.log}.
 
6
"""
 
7
 
 
8
import os, sys, time, logging, warnings
 
9
from cStringIO import StringIO
 
10
 
 
11
from twisted.trial import unittest
 
12
 
 
13
from twisted.python import log, failure
 
14
 
 
15
 
 
16
class FakeWarning(Warning):
 
17
    """
 
18
    A unique L{Warning} subclass used by tests for interactions of
 
19
    L{twisted.python.log} with the L{warnings} module.
 
20
    """
 
21
 
 
22
 
 
23
 
 
24
class LogTest(unittest.TestCase):
 
25
 
 
26
    def setUp(self):
 
27
        self.catcher = []
 
28
        self.observer = self.catcher.append
 
29
        log.addObserver(self.observer)
 
30
        self.addCleanup(log.removeObserver, self.observer)
 
31
 
 
32
 
 
33
    def testObservation(self):
 
34
        catcher = self.catcher
 
35
        log.msg("test", testShouldCatch=True)
 
36
        i = catcher.pop()
 
37
        self.assertEquals(i["message"][0], "test")
 
38
        self.assertEquals(i["testShouldCatch"], True)
 
39
        self.failUnless(i.has_key("time"))
 
40
        self.assertEquals(len(catcher), 0)
 
41
 
 
42
 
 
43
    def testContext(self):
 
44
        catcher = self.catcher
 
45
        log.callWithContext({"subsystem": "not the default",
 
46
                             "subsubsystem": "a",
 
47
                             "other": "c"},
 
48
                            log.callWithContext,
 
49
                            {"subsubsystem": "b"}, log.msg, "foo", other="d")
 
50
        i = catcher.pop()
 
51
        self.assertEquals(i['subsubsystem'], 'b')
 
52
        self.assertEquals(i['subsystem'], 'not the default')
 
53
        self.assertEquals(i['other'], 'd')
 
54
        self.assertEquals(i['message'][0], 'foo')
 
55
 
 
56
    def testErrors(self):
 
57
        for e, ig in [("hello world","hello world"),
 
58
                      (KeyError(), KeyError),
 
59
                      (failure.Failure(RuntimeError()), RuntimeError)]:
 
60
            log.err(e)
 
61
            i = self.catcher.pop()
 
62
            self.assertEquals(i['isError'], 1)
 
63
            self.flushLoggedErrors(ig)
 
64
 
 
65
    def testErrorsWithWhy(self):
 
66
        for e, ig in [("hello world","hello world"),
 
67
                      (KeyError(), KeyError),
 
68
                      (failure.Failure(RuntimeError()), RuntimeError)]:
 
69
            log.err(e, 'foobar')
 
70
            i = self.catcher.pop()
 
71
            self.assertEquals(i['isError'], 1)
 
72
            self.assertEquals(i['why'], 'foobar')
 
73
            self.flushLoggedErrors(ig)
 
74
 
 
75
 
 
76
    def test_erroneousErrors(self):
 
77
        """
 
78
        Exceptions raised by log observers are logged but the observer which
 
79
        raised the exception remains registered with the publisher.  These
 
80
        exceptions do not prevent the event from being sent to other observers
 
81
        registered with the publisher.
 
82
        """
 
83
        L1 = []
 
84
        L2 = []
 
85
        def broken(events):
 
86
            1 / 0
 
87
 
 
88
        for observer in [L1.append, broken, L2.append]:
 
89
            log.addObserver(observer)
 
90
            self.addCleanup(log.removeObserver, observer)
 
91
 
 
92
        for i in xrange(3):
 
93
            # Reset the lists for simpler comparison.
 
94
            L1[:] = []
 
95
            L2[:] = []
 
96
 
 
97
            # Send out the event which will break one of the observers.
 
98
            log.msg("Howdy, y'all.")
 
99
 
 
100
            # The broken observer should have caused this to be logged.  There
 
101
            # is a slight bug with LogPublisher - when it logs an error from an
 
102
            # observer, it uses the global "err", which is not necessarily
 
103
            # associated with it, but which may be associated with a different
 
104
            # LogPublisher!  See #3307.
 
105
            excs = self.flushLoggedErrors(ZeroDivisionError)
 
106
            self.assertEqual(len(excs), 1)
 
107
 
 
108
            # Both other observers should have seen the message.
 
109
            self.assertEquals(len(L1), 2)
 
110
            self.assertEquals(len(L2), 2)
 
111
 
 
112
            # The order is slightly wrong here.  The first event should be
 
113
            # delivered to all observers; then, errors should be delivered.
 
114
            self.assertEquals(L1[1]['message'], ("Howdy, y'all.",))
 
115
            self.assertEquals(L2[0]['message'], ("Howdy, y'all.",))
 
116
 
 
117
 
 
118
    def test_showwarning(self):
 
119
        """
 
120
        L{twisted.python.log.showwarning} emits the warning as a message
 
121
        to the Twisted logging system.
 
122
        """
 
123
        publisher = log.LogPublisher()
 
124
        publisher.addObserver(self.observer)
 
125
 
 
126
        publisher.showwarning(
 
127
            FakeWarning("unique warning message"), FakeWarning,
 
128
            "warning-filename.py", 27)
 
129
        event = self.catcher.pop()
 
130
        self.assertEqual(
 
131
            event['format'] % event,
 
132
            'warning-filename.py:27: twisted.test.test_log.FakeWarning: '
 
133
            'unique warning message')
 
134
        self.assertEqual(self.catcher, [])
 
135
 
 
136
        # Python 2.6 requires that any function used to override the
 
137
        # warnings.showwarning API accept a "line" parameter or a
 
138
        # deprecation warning is emitted.
 
139
        publisher.showwarning(
 
140
            FakeWarning("unique warning message"), FakeWarning,
 
141
            "warning-filename.py", 27, line=object())
 
142
        event = self.catcher.pop()
 
143
        self.assertEqual(
 
144
            event['format'] % event,
 
145
            'warning-filename.py:27: twisted.test.test_log.FakeWarning: '
 
146
            'unique warning message')
 
147
        self.assertEqual(self.catcher, [])
 
148
 
 
149
 
 
150
    def test_warningToFile(self):
 
151
        """
 
152
        L{twisted.python.log.showwarning} passes warnings with an explicit file
 
153
        target on to the underlying Python warning system.
 
154
        """
 
155
        message = "another unique message"
 
156
        category = FakeWarning
 
157
        filename = "warning-filename.py"
 
158
        lineno = 31
 
159
 
 
160
        output = StringIO()
 
161
        log.showwarning(message, category, filename, lineno, file=output)
 
162
 
 
163
        self.assertEqual(
 
164
            output.getvalue(),
 
165
            warnings.formatwarning(message, category, filename, lineno))
 
166
 
 
167
        # In Python 2.6, warnings.showwarning accepts a "line" argument which
 
168
        # gives the source line the warning message is to include.
 
169
        if sys.version_info >= (2, 6):
 
170
            line = "hello world"
 
171
            output = StringIO()
 
172
            log.showwarning(message, category, filename, lineno, file=output,
 
173
                            line=line)
 
174
 
 
175
            self.assertEqual(
 
176
                output.getvalue(),
 
177
                warnings.formatwarning(message, category, filename, lineno,
 
178
                                       line))
 
179
 
 
180
 
 
181
class FakeFile(list):
 
182
    def write(self, bytes):
 
183
        self.append(bytes)
 
184
 
 
185
    def flush(self):
 
186
        pass
 
187
 
 
188
class EvilStr:
 
189
    def __str__(self):
 
190
        1/0
 
191
 
 
192
class EvilRepr:
 
193
    def __str__(self):
 
194
        return "Happy Evil Repr"
 
195
    def __repr__(self):
 
196
        1/0
 
197
 
 
198
class EvilReprStr(EvilStr, EvilRepr):
 
199
    pass
 
200
 
 
201
class LogPublisherTestCaseMixin:
 
202
    def setUp(self):
 
203
        """
 
204
        Add a log observer which records log events in C{self.out}.  Also,
 
205
        make sure the default string encoding is ASCII so that
 
206
        L{testSingleUnicode} can test the behavior of logging unencodable
 
207
        unicode messages.
 
208
        """
 
209
        self.out = FakeFile()
 
210
        self.lp = log.LogPublisher()
 
211
        self.flo = log.FileLogObserver(self.out)
 
212
        self.lp.addObserver(self.flo.emit)
 
213
 
 
214
        try:
 
215
            str(u'\N{VULGAR FRACTION ONE HALF}')
 
216
        except UnicodeEncodeError:
 
217
            # This is the behavior we want - don't change anything.
 
218
            self._origEncoding = None
 
219
        else:
 
220
            reload(sys)
 
221
            self._origEncoding = sys.getdefaultencoding()
 
222
            sys.setdefaultencoding('ascii')
 
223
 
 
224
 
 
225
    def tearDown(self):
 
226
        """
 
227
        Verify that everything written to the fake file C{self.out} was a
 
228
        C{str}.  Also, restore the default string encoding to its previous
 
229
        setting, if it was modified by L{setUp}.
 
230
        """
 
231
        for chunk in self.out:
 
232
            self.failUnless(isinstance(chunk, str), "%r was not a string" % (chunk,))
 
233
 
 
234
        if self._origEncoding is not None:
 
235
            sys.setdefaultencoding(self._origEncoding)
 
236
            del sys.setdefaultencoding
 
237
 
 
238
 
 
239
 
 
240
class LogPublisherTestCase(LogPublisherTestCaseMixin, unittest.TestCase):
 
241
    def testSingleString(self):
 
242
        self.lp.msg("Hello, world.")
 
243
        self.assertEquals(len(self.out), 1)
 
244
 
 
245
 
 
246
    def testMultipleString(self):
 
247
        # Test some stupid behavior that will be deprecated real soon.
 
248
        # If you are reading this and trying to learn how the logging
 
249
        # system works, *do not use this feature*.
 
250
        self.lp.msg("Hello, ", "world.")
 
251
        self.assertEquals(len(self.out), 1)
 
252
 
 
253
 
 
254
    def testSingleUnicode(self):
 
255
        self.lp.msg(u"Hello, \N{VULGAR FRACTION ONE HALF} world.")
 
256
        self.assertEquals(len(self.out), 1)
 
257
        self.assertIn('with str error', self.out[0])
 
258
        self.assertIn('UnicodeEncodeError', self.out[0])
 
259
 
 
260
 
 
261
 
 
262
class FileObserverTestCase(LogPublisherTestCaseMixin, unittest.TestCase):
 
263
    def test_getTimezoneOffset(self):
 
264
        """
 
265
        Attempt to verify that L{FileLogObserver.getTimezoneOffset} returns
 
266
        correct values for the current C{TZ} environment setting.  Do this
 
267
        by setting C{TZ} to various well-known values and asserting that the
 
268
        reported offset is correct.
 
269
        """
 
270
        localDaylightTuple = (2006, 6, 30, 0, 0, 0, 4, 181, 1)
 
271
        utcDaylightTimestamp = time.mktime(localDaylightTuple)
 
272
        localStandardTuple = (2007, 1, 31, 0, 0, 0, 2, 31, 0)
 
273
        utcStandardTimestamp = time.mktime(localStandardTuple)
 
274
 
 
275
        originalTimezone = os.environ.get('TZ', None)
 
276
        try:
 
277
            # Test something west of UTC
 
278
            os.environ['TZ'] = 'America/New_York'
 
279
            time.tzset()
 
280
            self.assertEqual(
 
281
                self.flo.getTimezoneOffset(utcDaylightTimestamp),
 
282
                14400)
 
283
            self.assertEqual(
 
284
                self.flo.getTimezoneOffset(utcStandardTimestamp),
 
285
                18000)
 
286
 
 
287
            # Test something east of UTC
 
288
            os.environ['TZ'] = 'Europe/Berlin'
 
289
            time.tzset()
 
290
            self.assertEqual(
 
291
                self.flo.getTimezoneOffset(utcDaylightTimestamp),
 
292
                -7200)
 
293
            self.assertEqual(
 
294
                self.flo.getTimezoneOffset(utcStandardTimestamp),
 
295
                -3600)
 
296
 
 
297
            # Test a timezone that doesn't have DST
 
298
            os.environ['TZ'] = 'Africa/Johannesburg'
 
299
            time.tzset()
 
300
            self.assertEqual(
 
301
                self.flo.getTimezoneOffset(utcDaylightTimestamp),
 
302
                -7200)
 
303
            self.assertEqual(
 
304
                self.flo.getTimezoneOffset(utcStandardTimestamp),
 
305
                -7200)
 
306
        finally:
 
307
            if originalTimezone is None:
 
308
                del os.environ['TZ']
 
309
            else:
 
310
                os.environ['TZ'] = originalTimezone
 
311
            time.tzset()
 
312
    if getattr(time, 'tzset', None) is None:
 
313
        test_getTimezoneOffset.skip = (
 
314
            "Platform cannot change timezone, cannot verify correct offsets "
 
315
            "in well-known timezones.")
 
316
 
 
317
 
 
318
    def test_timeFormatting(self):
 
319
        """
 
320
        Test the method of L{FileLogObserver} which turns a timestamp into a
 
321
        human-readable string.
 
322
        """
 
323
        # There is no function in the time module which converts a UTC time
 
324
        # tuple to a timestamp.
 
325
        when = time.mktime((2001, 2, 3, 4, 5, 6, 7, 8, 0)) - time.timezone
 
326
 
 
327
        # Pretend to be in US/Eastern for a moment
 
328
        self.flo.getTimezoneOffset = lambda when: 18000
 
329
        self.assertEquals(self.flo.formatTime(when), '2001-02-02 23:05:06-0500')
 
330
 
 
331
        # Okay now we're in Eastern Europe somewhere
 
332
        self.flo.getTimezoneOffset = lambda when: -3600
 
333
        self.assertEquals(self.flo.formatTime(when), '2001-02-03 05:05:06+0100')
 
334
 
 
335
        # And off in the Pacific or someplace like that
 
336
        self.flo.getTimezoneOffset = lambda when: -39600
 
337
        self.assertEquals(self.flo.formatTime(when), '2001-02-03 15:05:06+1100')
 
338
 
 
339
        # One of those weird places with a half-hour offset timezone
 
340
        self.flo.getTimezoneOffset = lambda when: 5400
 
341
        self.assertEquals(self.flo.formatTime(when), '2001-02-03 02:35:06-0130')
 
342
 
 
343
        # Half-hour offset in the other direction
 
344
        self.flo.getTimezoneOffset = lambda when: -5400
 
345
        self.assertEquals(self.flo.formatTime(when), '2001-02-03 05:35:06+0130')
 
346
 
 
347
        # Test an offset which is between 0 and 60 minutes to make sure the
 
348
        # sign comes out properly in that case.
 
349
        self.flo.getTimezoneOffset = lambda when: 1800
 
350
        self.assertEquals(self.flo.formatTime(when), '2001-02-03 03:35:06-0030')
 
351
 
 
352
        # Test an offset between 0 and 60 minutes in the other direction.
 
353
        self.flo.getTimezoneOffset = lambda when: -1800
 
354
        self.assertEquals(self.flo.formatTime(when), '2001-02-03 04:35:06+0030')
 
355
 
 
356
        # If a strftime-format string is present on the logger, it should
 
357
        # use that instead.  Note we don't assert anything about day, hour
 
358
        # or minute because we cannot easily control what time.strftime()
 
359
        # thinks the local timezone is.
 
360
        self.flo.timeFormat = '%Y %m'
 
361
        self.assertEquals(self.flo.formatTime(when), '2001 02')
 
362
 
 
363
 
 
364
    def test_loggingAnObjectWithBroken__str__(self):
 
365
        #HELLO, MCFLY
 
366
        self.lp.msg(EvilStr())
 
367
        self.assertEquals(len(self.out), 1)
 
368
        # Logging system shouldn't need to crap itself for this trivial case
 
369
        self.assertNotIn('UNFORMATTABLE', self.out[0])
 
370
 
 
371
 
 
372
    def test_formattingAnObjectWithBroken__str__(self):
 
373
        self.lp.msg(format='%(blat)s', blat=EvilStr())
 
374
        self.assertEquals(len(self.out), 1)
 
375
        self.assertIn('Invalid format string or unformattable object', self.out[0])
 
376
 
 
377
 
 
378
    def test_brokenSystem__str__(self):
 
379
        self.lp.msg('huh', system=EvilStr())
 
380
        self.assertEquals(len(self.out), 1)
 
381
        self.assertIn('Invalid format string or unformattable object', self.out[0])
 
382
 
 
383
 
 
384
    def test_formattingAnObjectWithBroken__repr__Indirect(self):
 
385
        self.lp.msg(format='%(blat)s', blat=[EvilRepr()])
 
386
        self.assertEquals(len(self.out), 1)
 
387
        self.assertIn('UNFORMATTABLE OBJECT', self.out[0])
 
388
 
 
389
 
 
390
    def test_systemWithBroker__repr__Indirect(self):
 
391
        self.lp.msg('huh', system=[EvilRepr()])
 
392
        self.assertEquals(len(self.out), 1)
 
393
        self.assertIn('UNFORMATTABLE OBJECT', self.out[0])
 
394
 
 
395
 
 
396
    def test_simpleBrokenFormat(self):
 
397
        self.lp.msg(format='hooj %s %s', blat=1)
 
398
        self.assertEquals(len(self.out), 1)
 
399
        self.assertIn('Invalid format string or unformattable object', self.out[0])
 
400
 
 
401
 
 
402
    def test_ridiculousFormat(self):
 
403
        self.lp.msg(format=42, blat=1)
 
404
        self.assertEquals(len(self.out), 1)
 
405
        self.assertIn('Invalid format string or unformattable object', self.out[0])
 
406
 
 
407
 
 
408
    def test_evilFormat__repr__And__str__(self):
 
409
        self.lp.msg(format=EvilReprStr(), blat=1)
 
410
        self.assertEquals(len(self.out), 1)
 
411
        self.assertIn('PATHOLOGICAL', self.out[0])
 
412
 
 
413
 
 
414
    def test_strangeEventDict(self):
 
415
        """
 
416
        This kind of eventDict used to fail silently, so test it does.
 
417
        """
 
418
        self.lp.msg(message='', isError=False)
 
419
        self.assertEquals(len(self.out), 0)
 
420
 
 
421
 
 
422
    def test_startLoggingTwice(self):
 
423
        """
 
424
        There are some obscure error conditions that can occur when logging is
 
425
        started twice. See http://twistedmatrix.com/trac/ticket/3289 for more
 
426
        information.
 
427
        """
 
428
        # The bug is particular to the way that the t.p.log 'global' function
 
429
        # handle stdout. If we use our own stream, the error doesn't occur. If
 
430
        # we use our own LogPublisher, the error doesn't occur.
 
431
        sys.stdout = StringIO()
 
432
        self.addCleanup(setattr, sys, 'stdout', sys.__stdout__)
 
433
 
 
434
        def showError(eventDict):
 
435
            if eventDict['isError']:
 
436
                sys.__stdout__.write(eventDict['failure'].getTraceback())
 
437
 
 
438
        log.addObserver(showError)
 
439
        self.addCleanup(log.removeObserver, showError)
 
440
        observer = log.startLogging(sys.stdout)
 
441
        self.addCleanup(observer.stop)
 
442
        # At this point, we expect that sys.stdout is a StdioOnnaStick object.
 
443
        self.assertIsInstance(sys.stdout, log.StdioOnnaStick)
 
444
        fakeStdout = sys.stdout
 
445
        observer = log.startLogging(sys.stdout)
 
446
        self.assertIdentical(sys.stdout, fakeStdout)
 
447
 
 
448
 
 
449
class PythonLoggingObserverTestCase(unittest.TestCase):
 
450
    """
 
451
    Test the bridge with python logging module.
 
452
    """
 
453
    def setUp(self):
 
454
        self.out = StringIO()
 
455
 
 
456
        rootLogger = logging.getLogger("")
 
457
        self.originalLevel = rootLogger.getEffectiveLevel()
 
458
        rootLogger.setLevel(logging.DEBUG)
 
459
        self.hdlr = logging.StreamHandler(self.out)
 
460
        fmt = logging.Formatter(logging.BASIC_FORMAT)
 
461
        self.hdlr.setFormatter(fmt)
 
462
        rootLogger.addHandler(self.hdlr)
 
463
 
 
464
        self.lp = log.LogPublisher()
 
465
        self.obs = log.PythonLoggingObserver()
 
466
        self.lp.addObserver(self.obs.emit)
 
467
 
 
468
    def tearDown(self):
 
469
        rootLogger = logging.getLogger("")
 
470
        rootLogger.removeHandler(self.hdlr)
 
471
        rootLogger.setLevel(self.originalLevel)
 
472
        logging.shutdown()
 
473
 
 
474
    def test_singleString(self):
 
475
        """
 
476
        Test simple output, and default log level.
 
477
        """
 
478
        self.lp.msg("Hello, world.")
 
479
        self.assertIn("Hello, world.", self.out.getvalue())
 
480
        self.assertIn("INFO", self.out.getvalue())
 
481
 
 
482
    def test_errorString(self):
 
483
        """
 
484
        Test error output.
 
485
        """
 
486
        self.lp.msg(failure=failure.Failure(ValueError("That is bad.")), isError=True)
 
487
        self.assertIn("ERROR", self.out.getvalue())
 
488
 
 
489
    def test_formatString(self):
 
490
        """
 
491
        Test logging with a format.
 
492
        """
 
493
        self.lp.msg(format="%(bar)s oo %(foo)s", bar="Hello", foo="world")
 
494
        self.assertIn("Hello oo world", self.out.getvalue())
 
495
 
 
496
    def test_customLevel(self):
 
497
        """
 
498
        Test the logLevel keyword for customizing level used.
 
499
        """
 
500
        self.lp.msg("Spam egg.", logLevel=logging.DEBUG)
 
501
        self.assertIn("Spam egg.", self.out.getvalue())
 
502
        self.assertIn("DEBUG", self.out.getvalue())
 
503
        self.out.reset()
 
504
        self.lp.msg("Foo bar.", logLevel=logging.WARNING)
 
505
        self.assertIn("Foo bar.", self.out.getvalue())
 
506
        self.assertIn("WARNING", self.out.getvalue())
 
507
 
 
508
    def test_strangeEventDict(self):
 
509
        """
 
510
        Verify that an event dictionary which is not an error and has an empty
 
511
        message isn't recorded.
 
512
        """
 
513
        self.lp.msg(message='', isError=False)
 
514
        self.assertEquals(self.out.getvalue(), '')
 
515
 
 
516
 
 
517
class PythonLoggingIntegrationTestCase(unittest.TestCase):
 
518
    """
 
519
    Test integration of python logging bridge.
 
520
    """
 
521
    def test_startStopObserver(self):
 
522
        """
 
523
        Test that start and stop methods of the observer actually register
 
524
        and unregister to the log system.
 
525
        """
 
526
        oldAddObserver = log.addObserver
 
527
        oldRemoveObserver = log.removeObserver
 
528
        l = []
 
529
        try:
 
530
            log.addObserver = l.append
 
531
            log.removeObserver = l.remove
 
532
            obs = log.PythonLoggingObserver()
 
533
            obs.start()
 
534
            self.assertEquals(l[0], obs.emit)
 
535
            obs.stop()
 
536
            self.assertEquals(len(l), 0)
 
537
        finally:
 
538
            log.addObserver = oldAddObserver
 
539
            log.removeObserver = oldRemoveObserver
 
540
 
 
541
    def test_inheritance(self):
 
542
        """
 
543
        Test that we can inherit L{log.PythonLoggingObserver} and use super:
 
544
        that's basically a validation that L{log.PythonLoggingObserver} is
 
545
        new-style class.
 
546
        """
 
547
        class MyObserver(log.PythonLoggingObserver):
 
548
            def emit(self, eventDict):
 
549
                super(MyObserver, self).emit(eventDict)
 
550
        obs = MyObserver()
 
551
        l = []
 
552
        oldEmit = log.PythonLoggingObserver.emit
 
553
        try:
 
554
            log.PythonLoggingObserver.emit = l.append
 
555
            obs.emit('foo')
 
556
            self.assertEquals(len(l), 1)
 
557
        finally:
 
558
            log.PythonLoggingObserver.emit = oldEmit
 
559