~mwinter4/maus/ckov_0_9_3

« back to all changes in this revision

Viewing changes to third_party/nose-0.11.3/lib/python/nose/plugins/logcapture.py

  • Committer: tunnell
  • Date: 2010-09-30 13:56:05 UTC
  • Revision ID: tunnell@itchy-20100930135605-wxbkfgy75p0sndk3
add third party

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
"""
 
2
This plugin captures logging statements issued during test execution. When an
 
3
error or failure occurs, the captured log messages are attached to the running
 
4
test in the test.capturedLogging attribute, and displayed with the error failure
 
5
output. It is enabled by default but can be turned off with the option
 
6
``--nologcapture``.
 
7
 
 
8
You can filter captured logging statements with the ``--logging-filter`` option. 
 
9
If set, it specifies which logger(s) will be captured; loggers that do not match
 
10
will be passed. Example: specifying ``--logging-filter=sqlalchemy,myapp``
 
11
will ensure that only statements logged via sqlalchemy.engine, myapp
 
12
or myapp.foo.bar logger will be logged.
 
13
 
 
14
You can remove other installed logging handlers with the
 
15
``--logging-clear-handlers`` option.
 
16
"""
 
17
 
 
18
import logging
 
19
from logging.handlers import BufferingHandler
 
20
import threading
 
21
 
 
22
from nose.plugins.base import Plugin
 
23
from nose.util import anyp, ln, safe_str
 
24
 
 
25
try:
 
26
    from cStringIO import StringIO
 
27
except ImportError:
 
28
    from StringIO import StringIO
 
29
 
 
30
log = logging.getLogger(__name__)
 
31
 
 
32
class FilterSet(object):
 
33
    def __init__(self, filter_components):
 
34
        self.inclusive, self.exclusive = self._partition(filter_components)
 
35
 
 
36
    # @staticmethod
 
37
    def _partition(components):
 
38
        inclusive, exclusive = [], []
 
39
        for component in components:
 
40
            if component.startswith('-'):
 
41
                exclusive.append(component[1:])
 
42
            else:
 
43
                inclusive.append(component)
 
44
        return inclusive, exclusive
 
45
    _partition = staticmethod(_partition)
 
46
 
 
47
    def allow(self, record):
 
48
        """returns whether this record should be printed"""
 
49
        if not self:
 
50
            # nothing to filter
 
51
            return True
 
52
        return self._allow(record) and not self._deny(record)
 
53
 
 
54
    # @staticmethod
 
55
    def _any_match(matchers, record):
 
56
        """return the bool of whether `record` starts with
 
57
        any item in `matchers`"""
 
58
        def record_matches_key(key):
 
59
            return record == key or record.startswith(key + '.')
 
60
        return anyp(bool, map(record_matches_key, matchers))
 
61
    _any_match = staticmethod(_any_match)
 
62
 
 
63
    def _allow(self, record):
 
64
        if not self.inclusive:
 
65
            return True
 
66
        return self._any_match(self.inclusive, record)
 
67
 
 
68
    def _deny(self, record):
 
69
        if not self.exclusive:
 
70
            return False
 
71
        return self._any_match(self.exclusive, record)
 
72
 
 
73
 
 
74
class MyMemoryHandler(BufferingHandler):
 
75
    def __init__(self, capacity, logformat, logdatefmt, filters):
 
76
        BufferingHandler.__init__(self, capacity)
 
77
        fmt = logging.Formatter(logformat, logdatefmt)
 
78
        self.setFormatter(fmt)
 
79
        self.filterset = FilterSet(filters)
 
80
    def flush(self):
 
81
        pass # do nothing
 
82
    def truncate(self):
 
83
        self.buffer = []
 
84
    def filter(self, record):
 
85
        return self.filterset.allow(record.name)
 
86
    def __getstate__(self):
 
87
        state = self.__dict__.copy()
 
88
        del state['lock']
 
89
        return state
 
90
    def __setstate__(self, state):
 
91
        self.__dict__.update(state)
 
92
        self.lock = threading.RLock()
 
93
 
 
94
 
 
95
class LogCapture(Plugin):
 
96
    """
 
97
    Log capture plugin. Enabled by default. Disable with --nologcapture.
 
98
    This plugin captures logging statements issued during test execution,
 
99
    appending any output captured to the error or failure output,
 
100
    should the test fail or raise an error.
 
101
    """
 
102
    enabled = True
 
103
    env_opt = 'NOSE_NOLOGCAPTURE'
 
104
    name = 'logcapture'
 
105
    score = 500
 
106
    logformat = '%(name)s: %(levelname)s: %(message)s'
 
107
    logdatefmt = None
 
108
    clear = False
 
109
    filters = ['-nose']
 
110
 
 
111
    def options(self, parser, env):
 
112
        """Register commandline options.
 
113
        """
 
114
        parser.add_option(
 
115
            "--nologcapture", action="store_false",
 
116
            default=not env.get(self.env_opt), dest="logcapture",
 
117
            help="Disable logging capture plugin. "
 
118
                 "Logging configurtion will be left intact."
 
119
                 " [NOSE_NOLOGCAPTURE]")
 
120
        parser.add_option(
 
121
            "--logging-format", action="store", dest="logcapture_format",
 
122
            default=env.get('NOSE_LOGFORMAT') or self.logformat,
 
123
            metavar="FORMAT",
 
124
            help="Specify custom format to print statements. "
 
125
                 "Uses the same format as used by standard logging handlers."
 
126
                 " [NOSE_LOGFORMAT]")
 
127
        parser.add_option(
 
128
            "--logging-datefmt", action="store", dest="logcapture_datefmt",
 
129
            default=env.get('NOSE_LOGDATEFMT') or self.logdatefmt,
 
130
            metavar="FORMAT",
 
131
            help="Specify custom date/time format to print statements. "
 
132
                 "Uses the same format as used by standard logging handlers."
 
133
                 " [NOSE_LOGDATEFMT]")
 
134
        parser.add_option(
 
135
            "--logging-filter", action="store", dest="logcapture_filters",
 
136
            default=env.get('NOSE_LOGFILTER'),
 
137
            metavar="FILTER",
 
138
            help="Specify which statements to filter in/out. "
 
139
                 "By default, everything is captured. If the output is too"
 
140
                 " verbose,\nuse this option to filter out needless output.\n"
 
141
                 "Example: filter=foo will capture statements issued ONLY to\n"
 
142
                 " foo or foo.what.ever.sub but not foobar or other logger.\n"
 
143
                 "Specify multiple loggers with comma: filter=foo,bar,baz.\n"
 
144
                 "If any logger name is prefixed with a minus, eg filter=-foo,\n"
 
145
                 "it will be excluded rather than included. Default: "
 
146
                 "exclude logging messages from nose itself (-nose)."
 
147
                 " [NOSE_LOGFILTER]\n")
 
148
        parser.add_option(
 
149
            "--logging-clear-handlers", action="store_true",
 
150
            default=False, dest="logcapture_clear",
 
151
            help="Clear all other logging handlers")
 
152
 
 
153
    def configure(self, options, conf):
 
154
        """Configure plugin.
 
155
        """
 
156
        self.conf = conf
 
157
        # Disable if explicitly disabled, or if logging is
 
158
        # configured via logging config file
 
159
        if not options.logcapture or conf.loggingConfig:
 
160
            self.enabled = False
 
161
        self.logformat = options.logcapture_format
 
162
        self.logdatefmt = options.logcapture_datefmt
 
163
        self.clear = options.logcapture_clear
 
164
        if options.logcapture_filters:
 
165
            self.filters = options.logcapture_filters.split(',')
 
166
 
 
167
    def setupLoghandler(self):
 
168
        # setup our handler with root logger
 
169
        root_logger = logging.getLogger()
 
170
        if self.clear:
 
171
            if hasattr(root_logger, "handlers"):
 
172
                for handler in root_logger.handlers:
 
173
                    root_logger.removeHandler(handler)
 
174
            for logger in logging.Logger.manager.loggerDict.values():
 
175
                if hasattr(logger, "handlers"):
 
176
                    for handler in logger.handlers:
 
177
                        logger.removeHandler(handler)
 
178
        # make sure there isn't one already
 
179
        # you can't simply use "if self.handler not in root_logger.handlers"
 
180
        # since at least in unit tests this doesn't work --
 
181
        # LogCapture() is instantiated for each test case while root_logger
 
182
        # is module global
 
183
        # so we always add new MyMemoryHandler instance
 
184
        for handler in root_logger.handlers[:]:
 
185
            if isinstance(handler, MyMemoryHandler):
 
186
                root_logger.handlers.remove(handler)
 
187
        root_logger.addHandler(self.handler)
 
188
        # to make sure everything gets captured
 
189
        root_logger.setLevel(logging.NOTSET)
 
190
 
 
191
    def begin(self):
 
192
        """Set up logging handler before test run begins.
 
193
        """
 
194
        self.start()
 
195
 
 
196
    def start(self):
 
197
        self.handler = MyMemoryHandler(1000, self.logformat, self.logdatefmt,
 
198
                                       self.filters)
 
199
        self.setupLoghandler()
 
200
 
 
201
    def end(self):
 
202
        pass
 
203
 
 
204
    def beforeTest(self, test):
 
205
        """Clear buffers and handlers before test.
 
206
        """
 
207
        self.setupLoghandler()
 
208
 
 
209
    def afterTest(self, test):
 
210
        """Clear buffers after test.
 
211
        """
 
212
        self.handler.truncate()
 
213
 
 
214
    def formatFailure(self, test, err):
 
215
        """Add captured log messages to failure output.
 
216
        """
 
217
        return self.formatError(test, err)
 
218
 
 
219
    def formatError(self, test, err):
 
220
        """Add captured log messages to error output.
 
221
        """
 
222
        # logic flow copied from Capture.formatError
 
223
        test.capturedLogging = records = self.formatLogRecords()
 
224
        if not records:
 
225
            return err
 
226
        ec, ev, tb = err
 
227
        return (ec, self.addCaptureToErr(ev, records), tb)
 
228
 
 
229
    def formatLogRecords(self):
 
230
        format = self.handler.format
 
231
        return [safe_str(format(r)) for r in self.handler.buffer]
 
232
 
 
233
    def addCaptureToErr(self, ev, records):
 
234
        return '\n'.join([safe_str(ev), ln('>> begin captured logging <<')] + \
 
235
                          records + \
 
236
                          [ln('>> end captured logging <<')])