~marco-giusti/virtualbrick/deb-package

« back to all changes in this revision

Viewing changes to virtualbricks/tests/test_log.py

MergeĀ fromĀ upstream

Show diffs side-by-side

added added

removed removed

Lines of Context:
6
6
from virtualbricks import log
7
7
 
8
8
 
9
 
log.replaceTwistedLoggers()
10
9
logger = log.Logger()
11
10
test_event = log.Event("This is a test event")
12
11
test_event_2 = log.Event("This is another test event")
13
12
 
14
13
 
15
 
class TestCase(unittest.TestCase):
16
 
 
17
 
    def get_logger(self):
18
 
        ev = []
19
 
        obs = lambda e: ev.append(e)
20
 
        logger.publisher.addObserver(obs, False)
21
 
        self.addCleanup(logger.publisher.removeObserver, obs)
22
 
        return ev
23
 
 
24
 
 
25
 
DEFAULT_ATTRS = [
26
 
    "log_format",
27
 
    "log_id",
28
 
    "log_level",
29
 
    "log_logger",
30
 
    "log_namespace",
31
 
    "log_source",
32
 
    "log_time"
33
 
]
34
 
EXTENDED_ARGS = sorted(DEFAULT_ATTRS + ["format", "logLevel", "log_legacy"])
35
 
 
36
 
 
37
 
class TestLog(TestCase):
 
14
class Observer:
 
15
 
 
16
    def __init__(self):
 
17
        self.events = []
 
18
 
 
19
    def __call__(self, event):
 
20
        self.events.append(event)
 
21
 
 
22
 
 
23
class EventCmp:
 
24
 
 
25
    def __init__(self, event):
 
26
        self.event = event
 
27
 
 
28
    def __eq__(self, other):
 
29
        if isinstance(other, dict):
 
30
            return self.event.is_(other)
 
31
        return NotImplemented
 
32
 
 
33
    def __ne__(self, other):
 
34
        return not self.__eq__(other)
 
35
 
 
36
 
 
37
def install_observer(test_case):
 
38
    observer = Observer()
 
39
    logger.publisher.addObserver(observer)
 
40
    test_case.addCleanup(logger.publisher.removeObserver, observer)
 
41
    return observer
 
42
 
 
43
 
 
44
class TestLog(unittest.TestCase):
 
45
 
 
46
    def setUp(self):
 
47
        self.observer = install_observer(self)
 
48
 
 
49
    def test_log(self):
 
50
        """Send a simple event."""
 
51
 
 
52
        logger.info(test_event)
 
53
        self.assertEqual(self.observer.events, [EventCmp(test_event)])
38
54
 
39
55
    def test_tap(self):
40
 
        """Collect only specific events."""
 
56
        """Collect only specific events. Here test_event_2 is not collected."""
41
57
 
42
 
        ev = []
43
 
        self.addCleanup(test_event.tap(ev.append, logger.publisher))
 
58
        observer = Observer()
 
59
        self.addCleanup(test_event.tap(observer, logger.publisher))
44
60
        logger.info(test_event)
45
61
        logger.info(test_event_2)
46
 
        self.assertEqual(len(ev), 1)
47
 
        self.assertIn("log_id", ev[0])
48
 
        self.assertEqual(ev[0]["log_id"], test_event.log_id)
49
 
 
50
 
    def _test_event_attrs(self, emit, attrs):
51
 
        """Test the attributes of an event."""
52
 
 
53
 
        events = self.get_logger()
54
 
        emit()
55
 
        self.assertEqual(len(events), 1)
56
 
        self.assertIs(type(events[0]), dict)
57
 
        self.assertEqual(sorted(events[0].keys()), attrs)
 
62
        self.assertEqual(observer.events, [EventCmp(test_event)])
58
63
 
59
64
    def test_info_event_attrs(self):
60
65
        """
63
68
        The LegacyLogObserver add some extra attributes to the event...
64
69
        """
65
70
 
66
 
        self._test_event_attrs(lambda: logger.info(test_event), EXTENDED_ARGS)
67
 
 
68
 
    def test_debug_event_attrs(self):
69
 
        """
70
 
        Test the attributes of an event of level LogLevel.DEBUG.
71
 
 
72
 
        ...but The LegacyLogObserver is filtered so is not called for the event
73
 
        with level LogLevel.DEBUG.
74
 
        """
75
 
 
76
 
        self._test_event_attrs(lambda: logger.debug(test_event), DEFAULT_ATTRS)
77
 
 
78
 
    def test_legacy_observer(self):
 
71
        logger.info(test_event)
 
72
        self.assertEqual(self.observer.events, [EventCmp(test_event)])
 
73
        self.assertEqual(sorted(self.observer.events[0].keys()),
 
74
                         ["format", "logLevel", "log_format", "log_id",
 
75
                          "log_legacy", "log_level", "log_logger",
 
76
                          "log_namespace", "log_source", "log_time"])
 
77
 
 
78
    def test_filter_event(self):
 
79
        """Events can be filtered."""
 
80
 
 
81
        logger.publisher.levels.setLogLevelForNamespace(
 
82
            "virtualbricks.tests.test_log", log.LogLevel.warn)
 
83
        self.addCleanup(logger.publisher.levels.clearLogLevels)
 
84
        logger.info(test_event)
 
85
        self.assertEqual(len(self.observer.events), 0)
 
86
 
 
87
    def test_legacy_emitter(self):
79
88
        """Test the events logged with the legacy logger are not lost."""
80
89
 
81
 
        ev = self.get_logger()
 
90
        observer = log.LegacyAdapter()
 
91
        legacylog.addObserver(observer)
 
92
        self.addCleanup(legacylog.removeObserver, observer)
82
93
        legacylog.msg("test")
83
94
        legacylog.err(RuntimeError("error"))
84
 
        self.assertEqual(len(self.flushLoggedErrors(Exception)), 1)
85
 
        self.assertEqual(len(ev), 2)
86
 
 
87
 
    def test_legacy_observer_event_attrs(self):
88
 
        """
89
 
        Events logged with the legacy logger does not have the log_id
90
 
        attribute.
91
 
        """
92
 
 
93
 
        attrs = EXTENDED_ARGS[:]
94
 
        attrs.remove("log_id")
95
 
        self._test_event_attrs(lambda: legacylog.msg("test"), attrs)
96
 
 
97
 
 
98
 
class TestStdLogging(TestCase):
 
95
        err = self.flushLoggedErrors(RuntimeError)
 
96
        self.assertEqual(len(self.observer.events), 2)
 
97
        self.assertEqual(len(err), 1)
 
98
 
 
99
    def test_legacy_observer(self):
 
100
        """
 
101
        If a message is emitted by the new logging machinery, a legacy observer
 
102
        does not miss it.
 
103
        """
 
104
 
 
105
        observer = Observer()
 
106
        legacylog.addObserver(observer)
 
107
        self.addCleanup(legacylog.removeObserver, observer)
 
108
        logger.info(test_event)
 
109
        self.assertEqual(observer.events, [EventCmp(test_event)])
 
110
 
 
111
    def test_legacy_observer_ignore_debug(self):
 
112
        """
 
113
        By default all debug messages are filtered by the legacy observer.
 
114
        """
 
115
 
 
116
        observer = Observer()
 
117
        legacylog.addObserver(observer)
 
118
        self.addCleanup(legacylog.removeObserver, observer)
 
119
        logger.debug(test_event)
 
120
        self.assertEqual(observer.events, [])
 
121
 
 
122
 
 
123
class TestStdLogging(unittest.TestCase):
99
124
    """Test the integration with the standard logging module."""
100
125
 
101
 
    def _install_std_logging_observer(self):
 
126
    def setUp(self):
 
127
        self.observer = install_observer(self)
102
128
        root = logging.getLogger()
103
 
        handler = log.LoggingToNewLogginAdapter()
 
129
        handler = log.StdLoggingAdapter()
104
130
        root.addHandler(handler)
105
131
        self.addCleanup(root.removeHandler, handler)
106
132
 
107
133
    def test_std_logging_adapter(self):
108
134
        """Install and handler to the std's root logger."""
109
135
 
110
 
        ev = self.get_logger()
111
 
        self._install_std_logging_observer()
112
136
        try:
113
 
            raise Exception("test")
 
137
            raise RuntimeError("test")
114
138
        except:
115
139
            logging.exception("exp")
116
 
        errors = self.flushLoggedErrors(Exception)
117
 
        self.assertEqual(len(errors), 1)
118
 
        self.assertEqual(len(ev), 1)
119
 
        self.assertEqual(ev[0]["msg"], "exp")
 
140
        self.flushLoggedErrors(RuntimeError)
 
141
        self.assertEqual(len(self.observer.events), 1)
 
142
        self.assertEqual(self.observer.events[0]["log_format"].split("\n")[0], "exp")
 
143
 
 
144
    def test_event_has_log_id(self):
 
145
        """
 
146
        Test if events logged with the standard logging library have the
 
147
        'log_id' attribute.
 
148
        """
 
149
 
 
150
        logging.warn("test")
 
151
        self.assertEqual(len(self.observer.events), 1)
 
152
        self.assertIn("log_id", self.observer.events[0])