~dingqi-lxb/percona-server/5.5_log_queries_in_memory

« back to all changes in this revision

Viewing changes to python-for-subunit2junitxml/subunit/__init__.py

  • Committer: Stewart Smith
  • Date: 2011-10-06 07:49:44 UTC
  • mfrom: (175.1.4 5.5)
  • Revision ID: stewart@flamingspork.com-20111006074944-xtwnccssnymjqfj0
Merge subunit support for mtr

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#
 
2
#  subunit: extensions to Python unittest to get test results from subprocesses.
 
3
#  Copyright (C) 2005  Robert Collins <robertc@robertcollins.net>
 
4
#
 
5
#  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
 
6
#  license at the users choice. A copy of both licenses are available in the
 
7
#  project source as Apache-2.0 and BSD. You may not use this file except in
 
8
#  compliance with one of these two licences.
 
9
#
 
10
#  Unless required by applicable law or agreed to in writing, software
 
11
#  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
 
12
#  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
 
13
#  license you chose for the specific language governing permissions and
 
14
#  limitations under that license.
 
15
#
 
16
 
 
17
"""Subunit - a streaming test protocol
 
18
 
 
19
Overview
 
20
++++++++
 
21
 
 
22
The ``subunit`` Python package provides a number of ``unittest`` extensions
 
23
which can be used to cause tests to output Subunit, to parse Subunit streams
 
24
into test activity, perform seamless test isolation within a regular test
 
25
case and variously sort, filter and report on test runs.
 
26
 
 
27
 
 
28
Key Classes
 
29
-----------
 
30
 
 
31
The ``subunit.TestProtocolClient`` class is a ``unittest.TestResult``
 
32
extension which will translate a test run into a Subunit stream.
 
33
 
 
34
The ``subunit.ProtocolTestCase`` class is an adapter between the Subunit wire
 
35
protocol and the ``unittest.TestCase`` object protocol. It is used to translate
 
36
a stream into a test run, which regular ``unittest.TestResult`` objects can
 
37
process and report/inspect.
 
38
 
 
39
Subunit has support for non-blocking usage too, for use with asyncore or
 
40
Twisted. See the ``TestProtocolServer`` parser class for more details.
 
41
 
 
42
Subunit includes extensions to the Python ``TestResult`` protocol. These are
 
43
all done in a compatible manner: ``TestResult`` objects that do not implement
 
44
the extension methods will not cause errors to be raised, instead the extension
 
45
will either lose fidelity (for instance, folding expected failures to success
 
46
in Python versions < 2.7 or 3.1), or discard the extended data (for extra
 
47
details, tags, timestamping and progress markers).
 
48
 
 
49
The test outcome methods ``addSuccess``, ``addError``, ``addExpectedFailure``,
 
50
``addFailure``, ``addSkip`` take an optional keyword parameter ``details``
 
51
which can be used instead of the usual python unittest parameter.
 
52
When used the value of details should be a dict from ``string`` to
 
53
``testtools.content.Content`` objects. This is a draft API being worked on with
 
54
the Python Testing In Python mail list, with the goal of permitting a common
 
55
way to provide additional data beyond a traceback, such as captured data from
 
56
disk, logging messages etc. The reference for this API is in testtools (0.9.0
 
57
and newer).
 
58
 
 
59
The ``tags(new_tags, gone_tags)`` method is called (if present) to add or
 
60
remove tags in the test run that is currently executing. If called when no
 
61
test is in progress (that is, if called outside of the ``startTest``,
 
62
``stopTest`` pair), the the tags apply to all sebsequent tests. If called
 
63
when a test is in progress, then the tags only apply to that test.
 
64
 
 
65
The ``time(a_datetime)`` method is called (if present) when a ``time:``
 
66
directive is encountered in a Subunit stream. This is used to tell a TestResult
 
67
about the time that events in the stream occured at, to allow reconstructing
 
68
test timing from a stream.
 
69
 
 
70
The ``progress(offset, whence)`` method controls progress data for a stream.
 
71
The offset parameter is an int, and whence is one of subunit.PROGRESS_CUR,
 
72
subunit.PROGRESS_SET, PROGRESS_PUSH, PROGRESS_POP. Push and pop operations
 
73
ignore the offset parameter.
 
74
 
 
75
 
 
76
Python test support
 
77
-------------------
 
78
 
 
79
``subunit.run`` is a convenience wrapper to run a Python test suite via
 
80
the command line, reporting via Subunit::
 
81
 
 
82
  $ python -m subunit.run mylib.tests.test_suite
 
83
 
 
84
The ``IsolatedTestSuite`` class is a TestSuite that forks before running its
 
85
tests, allowing isolation between the test runner and some tests.
 
86
 
 
87
Similarly, ``IsolatedTestCase`` is a base class which can be subclassed to get
 
88
tests that will fork() before that individual test is run.
 
89
 
 
90
`ExecTestCase`` is a convenience wrapper for running an external
 
91
program to get a Subunit stream and then report that back to an arbitrary
 
92
result object::
 
93
 
 
94
 class AggregateTests(subunit.ExecTestCase):
 
95
 
 
96
     def test_script_one(self):
 
97
         './bin/script_one'
 
98
 
 
99
     def test_script_two(self):
 
100
         './bin/script_two'
 
101
 
 
102
 # Normally your normal test loading would take of this automatically,
 
103
 # It is only spelt out in detail here for clarity.
 
104
 suite = unittest.TestSuite([AggregateTests("test_script_one"),
 
105
     AggregateTests("test_script_two")])
 
106
 # Create any TestResult class you like.
 
107
 result = unittest._TextTestResult(sys.stdout)
 
108
 # And run your suite as normal, Subunit will exec each external script as
 
109
 # needed and report to your result object.
 
110
 suite.run(result)
 
111
 
 
112
Utility modules
 
113
---------------
 
114
 
 
115
* subunit.chunked contains HTTP chunked encoding/decoding logic.
 
116
* subunit.test_results contains TestResult helper classes.
 
117
"""
 
118
 
 
119
import os
 
120
import re
 
121
import subprocess
 
122
import sys
 
123
import unittest
 
124
 
 
125
from testtools import content, content_type, ExtendedToOriginalDecorator
 
126
from testtools.compat import _b, _u, BytesIO, StringIO
 
127
try:
 
128
    from testtools.testresult.real import _StringException
 
129
    RemoteException = _StringException
 
130
    # For testing: different pythons have different str() implementations.
 
131
    if sys.version_info > (3, 0):
 
132
        _remote_exception_str = "testtools.testresult.real._StringException"
 
133
        _remote_exception_str_chunked = "34\r\n" + _remote_exception_str
 
134
    else:
 
135
        _remote_exception_str = "_StringException" 
 
136
        _remote_exception_str_chunked = "1A\r\n" + _remote_exception_str
 
137
except ImportError:
 
138
    raise ImportError ("testtools.testresult.real does not contain "
 
139
        "_StringException, check your version.")
 
140
from testtools import testresult
 
141
 
 
142
from subunit import chunked, details, iso8601, test_results
 
143
 
 
144
 
 
145
PROGRESS_SET = 0
 
146
PROGRESS_CUR = 1
 
147
PROGRESS_PUSH = 2
 
148
PROGRESS_POP = 3
 
149
 
 
150
 
 
151
def test_suite():
 
152
    import subunit.tests
 
153
    return subunit.tests.test_suite()
 
154
 
 
155
 
 
156
def join_dir(base_path, path):
 
157
    """
 
158
    Returns an absolute path to C{path}, calculated relative to the parent
 
159
    of C{base_path}.
 
160
 
 
161
    @param base_path: A path to a file or directory.
 
162
    @param path: An absolute path, or a path relative to the containing
 
163
    directory of C{base_path}.
 
164
 
 
165
    @return: An absolute path to C{path}.
 
166
    """
 
167
    return os.path.join(os.path.dirname(os.path.abspath(base_path)), path)
 
168
 
 
169
 
 
170
def tags_to_new_gone(tags):
 
171
    """Split a list of tags into a new_set and a gone_set."""
 
172
    new_tags = set()
 
173
    gone_tags = set()
 
174
    for tag in tags:
 
175
        if tag[0] == '-':
 
176
            gone_tags.add(tag[1:])
 
177
        else:
 
178
            new_tags.add(tag)
 
179
    return new_tags, gone_tags
 
180
 
 
181
 
 
182
class DiscardStream(object):
 
183
    """A filelike object which discards what is written to it."""
 
184
 
 
185
    def write(self, bytes):
 
186
        pass
 
187
 
 
188
 
 
189
class _ParserState(object):
 
190
    """State for the subunit parser."""
 
191
 
 
192
    def __init__(self, parser):
 
193
        self.parser = parser
 
194
        self._test_sym = (_b('test'), _b('testing'))
 
195
        self._colon_sym = _b(':')
 
196
        self._error_sym = (_b('error'),)
 
197
        self._failure_sym = (_b('failure'),)
 
198
        self._progress_sym = (_b('progress'),)
 
199
        self._skip_sym = _b('skip')
 
200
        self._success_sym = (_b('success'), _b('successful'))
 
201
        self._tags_sym = (_b('tags'),)
 
202
        self._time_sym = (_b('time'),)
 
203
        self._xfail_sym = (_b('xfail'),)
 
204
        self._uxsuccess_sym = (_b('uxsuccess'),)
 
205
        self._start_simple = _u(" [")
 
206
        self._start_multipart = _u(" [ multipart")
 
207
 
 
208
    def addError(self, offset, line):
 
209
        """An 'error:' directive has been read."""
 
210
        self.parser.stdOutLineReceived(line)
 
211
 
 
212
    def addExpectedFail(self, offset, line):
 
213
        """An 'xfail:' directive has been read."""
 
214
        self.parser.stdOutLineReceived(line)
 
215
 
 
216
    def addFailure(self, offset, line):
 
217
        """A 'failure:' directive has been read."""
 
218
        self.parser.stdOutLineReceived(line)
 
219
 
 
220
    def addSkip(self, offset, line):
 
221
        """A 'skip:' directive has been read."""
 
222
        self.parser.stdOutLineReceived(line)
 
223
 
 
224
    def addSuccess(self, offset, line):
 
225
        """A 'success:' directive has been read."""
 
226
        self.parser.stdOutLineReceived(line)
 
227
 
 
228
    def lineReceived(self, line):
 
229
        """a line has been received."""
 
230
        parts = line.split(None, 1)
 
231
        if len(parts) == 2 and line.startswith(parts[0]):
 
232
            cmd, rest = parts
 
233
            offset = len(cmd) + 1
 
234
            cmd = cmd.rstrip(self._colon_sym)
 
235
            if cmd in self._test_sym:
 
236
                self.startTest(offset, line)
 
237
            elif cmd in self._error_sym:
 
238
                self.addError(offset, line)
 
239
            elif cmd in self._failure_sym:
 
240
                self.addFailure(offset, line)
 
241
            elif cmd in self._progress_sym:
 
242
                self.parser._handleProgress(offset, line)
 
243
            elif cmd in self._skip_sym:
 
244
                self.addSkip(offset, line)
 
245
            elif cmd in self._success_sym:
 
246
                self.addSuccess(offset, line)
 
247
            elif cmd in self._tags_sym:
 
248
                self.parser._handleTags(offset, line)
 
249
                self.parser.subunitLineReceived(line)
 
250
            elif cmd in self._time_sym:
 
251
                self.parser._handleTime(offset, line)
 
252
                self.parser.subunitLineReceived(line)
 
253
            elif cmd in self._xfail_sym:
 
254
                self.addExpectedFail(offset, line)
 
255
            elif cmd in self._uxsuccess_sym:
 
256
                self.addUnexpectedSuccess(offset, line)
 
257
            else:
 
258
                self.parser.stdOutLineReceived(line)
 
259
        else:
 
260
            self.parser.stdOutLineReceived(line)
 
261
 
 
262
    def lostConnection(self):
 
263
        """Connection lost."""
 
264
        self.parser._lostConnectionInTest(_u('unknown state of '))
 
265
 
 
266
    def startTest(self, offset, line):
 
267
        """A test start command received."""
 
268
        self.parser.stdOutLineReceived(line)
 
269
 
 
270
 
 
271
class _InTest(_ParserState):
 
272
    """State for the subunit parser after reading a test: directive."""
 
273
 
 
274
    def _outcome(self, offset, line, no_details, details_state):
 
275
        """An outcome directive has been read.
 
276
 
 
277
        :param no_details: Callable to call when no details are presented.
 
278
        :param details_state: The state to switch to for details
 
279
            processing of this outcome.
 
280
        """
 
281
        test_name = line[offset:-1].decode('utf8')
 
282
        if self.parser.current_test_description == test_name:
 
283
            self.parser._state = self.parser._outside_test
 
284
            self.parser.current_test_description = None
 
285
            no_details()
 
286
            self.parser.client.stopTest(self.parser._current_test)
 
287
            self.parser._current_test = None
 
288
            self.parser.subunitLineReceived(line)
 
289
        elif self.parser.current_test_description + self._start_simple == \
 
290
            test_name:
 
291
            self.parser._state = details_state
 
292
            details_state.set_simple()
 
293
            self.parser.subunitLineReceived(line)
 
294
        elif self.parser.current_test_description + self._start_multipart == \
 
295
            test_name:
 
296
            self.parser._state = details_state
 
297
            details_state.set_multipart()
 
298
            self.parser.subunitLineReceived(line)
 
299
        else:
 
300
            self.parser.stdOutLineReceived(line)
 
301
 
 
302
    def _error(self):
 
303
        self.parser.client.addError(self.parser._current_test,
 
304
            details={})
 
305
 
 
306
    def addError(self, offset, line):
 
307
        """An 'error:' directive has been read."""
 
308
        self._outcome(offset, line, self._error,
 
309
            self.parser._reading_error_details)
 
310
 
 
311
    def _xfail(self):
 
312
        self.parser.client.addExpectedFailure(self.parser._current_test,
 
313
            details={})
 
314
 
 
315
    def addExpectedFail(self, offset, line):
 
316
        """An 'xfail:' directive has been read."""
 
317
        self._outcome(offset, line, self._xfail,
 
318
            self.parser._reading_xfail_details)
 
319
 
 
320
    def _uxsuccess(self):
 
321
        self.parser.client.addUnexpectedSuccess(self.parser._current_test)
 
322
 
 
323
    def addUnexpectedSuccess(self, offset, line):
 
324
        """A 'uxsuccess:' directive has been read."""
 
325
        self._outcome(offset, line, self._uxsuccess,
 
326
            self.parser._reading_uxsuccess_details)
 
327
 
 
328
    def _failure(self):
 
329
        self.parser.client.addFailure(self.parser._current_test, details={})
 
330
 
 
331
    def addFailure(self, offset, line):
 
332
        """A 'failure:' directive has been read."""
 
333
        self._outcome(offset, line, self._failure,
 
334
            self.parser._reading_failure_details)
 
335
 
 
336
    def _skip(self):
 
337
        self.parser.client.addSkip(self.parser._current_test, details={})
 
338
 
 
339
    def addSkip(self, offset, line):
 
340
        """A 'skip:' directive has been read."""
 
341
        self._outcome(offset, line, self._skip,
 
342
            self.parser._reading_skip_details)
 
343
 
 
344
    def _succeed(self):
 
345
        self.parser.client.addSuccess(self.parser._current_test, details={})
 
346
 
 
347
    def addSuccess(self, offset, line):
 
348
        """A 'success:' directive has been read."""
 
349
        self._outcome(offset, line, self._succeed,
 
350
            self.parser._reading_success_details)
 
351
 
 
352
    def lostConnection(self):
 
353
        """Connection lost."""
 
354
        self.parser._lostConnectionInTest(_u(''))
 
355
 
 
356
 
 
357
class _OutSideTest(_ParserState):
 
358
    """State for the subunit parser outside of a test context."""
 
359
 
 
360
    def lostConnection(self):
 
361
        """Connection lost."""
 
362
 
 
363
    def startTest(self, offset, line):
 
364
        """A test start command received."""
 
365
        self.parser._state = self.parser._in_test
 
366
        test_name = line[offset:-1].decode('utf8')
 
367
        self.parser._current_test = RemotedTestCase(test_name)
 
368
        self.parser.current_test_description = test_name
 
369
        self.parser.client.startTest(self.parser._current_test)
 
370
        self.parser.subunitLineReceived(line)
 
371
 
 
372
 
 
373
class _ReadingDetails(_ParserState):
 
374
    """Common logic for readin state details."""
 
375
 
 
376
    def endDetails(self):
 
377
        """The end of a details section has been reached."""
 
378
        self.parser._state = self.parser._outside_test
 
379
        self.parser.current_test_description = None
 
380
        self._report_outcome()
 
381
        self.parser.client.stopTest(self.parser._current_test)
 
382
 
 
383
    def lineReceived(self, line):
 
384
        """a line has been received."""
 
385
        self.details_parser.lineReceived(line)
 
386
        self.parser.subunitLineReceived(line)
 
387
 
 
388
    def lostConnection(self):
 
389
        """Connection lost."""
 
390
        self.parser._lostConnectionInTest(_u('%s report of ') %
 
391
            self._outcome_label())
 
392
 
 
393
    def _outcome_label(self):
 
394
        """The label to describe this outcome."""
 
395
        raise NotImplementedError(self._outcome_label)
 
396
 
 
397
    def set_simple(self):
 
398
        """Start a simple details parser."""
 
399
        self.details_parser = details.SimpleDetailsParser(self)
 
400
 
 
401
    def set_multipart(self):
 
402
        """Start a multipart details parser."""
 
403
        self.details_parser = details.MultipartDetailsParser(self)
 
404
 
 
405
 
 
406
class _ReadingFailureDetails(_ReadingDetails):
 
407
    """State for the subunit parser when reading failure details."""
 
408
 
 
409
    def _report_outcome(self):
 
410
        self.parser.client.addFailure(self.parser._current_test,
 
411
            details=self.details_parser.get_details())
 
412
 
 
413
    def _outcome_label(self):
 
414
        return "failure"
 
415
 
 
416
 
 
417
class _ReadingErrorDetails(_ReadingDetails):
 
418
    """State for the subunit parser when reading error details."""
 
419
 
 
420
    def _report_outcome(self):
 
421
        self.parser.client.addError(self.parser._current_test,
 
422
            details=self.details_parser.get_details())
 
423
 
 
424
    def _outcome_label(self):
 
425
        return "error"
 
426
 
 
427
 
 
428
class _ReadingExpectedFailureDetails(_ReadingDetails):
 
429
    """State for the subunit parser when reading xfail details."""
 
430
 
 
431
    def _report_outcome(self):
 
432
        self.parser.client.addExpectedFailure(self.parser._current_test,
 
433
            details=self.details_parser.get_details())
 
434
 
 
435
    def _outcome_label(self):
 
436
        return "xfail"
 
437
 
 
438
 
 
439
class _ReadingUnexpectedSuccessDetails(_ReadingDetails):
 
440
    """State for the subunit parser when reading uxsuccess details."""
 
441
 
 
442
    def _report_outcome(self):
 
443
        self.parser.client.addUnexpectedSuccess(self.parser._current_test,
 
444
            details=self.details_parser.get_details())
 
445
 
 
446
    def _outcome_label(self):
 
447
        return "uxsuccess"
 
448
 
 
449
 
 
450
class _ReadingSkipDetails(_ReadingDetails):
 
451
    """State for the subunit parser when reading skip details."""
 
452
 
 
453
    def _report_outcome(self):
 
454
        self.parser.client.addSkip(self.parser._current_test,
 
455
            details=self.details_parser.get_details("skip"))
 
456
 
 
457
    def _outcome_label(self):
 
458
        return "skip"
 
459
 
 
460
 
 
461
class _ReadingSuccessDetails(_ReadingDetails):
 
462
    """State for the subunit parser when reading success details."""
 
463
 
 
464
    def _report_outcome(self):
 
465
        self.parser.client.addSuccess(self.parser._current_test,
 
466
            details=self.details_parser.get_details("success"))
 
467
 
 
468
    def _outcome_label(self):
 
469
        return "success"
 
470
 
 
471
 
 
472
class TestProtocolServer(object):
 
473
    """A parser for subunit.
 
474
 
 
475
    :ivar tags: The current tags associated with the protocol stream.
 
476
    """
 
477
 
 
478
    def __init__(self, client, stream=None, forward_stream=None):
 
479
        """Create a TestProtocolServer instance.
 
480
 
 
481
        :param client: An object meeting the unittest.TestResult protocol.
 
482
        :param stream: The stream that lines received which are not part of the
 
483
            subunit protocol should be written to. This allows custom handling
 
484
            of mixed protocols. By default, sys.stdout will be used for
 
485
            convenience. It should accept bytes to its write() method.
 
486
        :param forward_stream: A stream to forward subunit lines to. This
 
487
            allows a filter to forward the entire stream while still parsing
 
488
            and acting on it. By default forward_stream is set to
 
489
            DiscardStream() and no forwarding happens.
 
490
        """
 
491
        self.client = ExtendedToOriginalDecorator(client)
 
492
        if stream is None:
 
493
            stream = sys.stdout
 
494
            if sys.version_info > (3, 0):
 
495
                stream = stream.buffer
 
496
        self._stream = stream
 
497
        self._forward_stream = forward_stream or DiscardStream()
 
498
        # state objects we can switch too
 
499
        self._in_test = _InTest(self)
 
500
        self._outside_test = _OutSideTest(self)
 
501
        self._reading_error_details = _ReadingErrorDetails(self)
 
502
        self._reading_failure_details = _ReadingFailureDetails(self)
 
503
        self._reading_skip_details = _ReadingSkipDetails(self)
 
504
        self._reading_success_details = _ReadingSuccessDetails(self)
 
505
        self._reading_xfail_details = _ReadingExpectedFailureDetails(self)
 
506
        self._reading_uxsuccess_details = _ReadingUnexpectedSuccessDetails(self)
 
507
        # start with outside test.
 
508
        self._state = self._outside_test
 
509
        # Avoid casts on every call
 
510
        self._plusminus = _b('+-')
 
511
        self._push_sym = _b('push')
 
512
        self._pop_sym = _b('pop')
 
513
 
 
514
    def _handleProgress(self, offset, line):
 
515
        """Process a progress directive."""
 
516
        line = line[offset:].strip()
 
517
        if line[0] in self._plusminus:
 
518
            whence = PROGRESS_CUR
 
519
            delta = int(line)
 
520
        elif line == self._push_sym:
 
521
            whence = PROGRESS_PUSH
 
522
            delta = None
 
523
        elif line == self._pop_sym:
 
524
            whence = PROGRESS_POP
 
525
            delta = None
 
526
        else:
 
527
            whence = PROGRESS_SET
 
528
            delta = int(line)
 
529
        self.client.progress(delta, whence)
 
530
 
 
531
    def _handleTags(self, offset, line):
 
532
        """Process a tags command."""
 
533
        tags = line[offset:].decode('utf8').split()
 
534
        new_tags, gone_tags = tags_to_new_gone(tags)
 
535
        self.client.tags(new_tags, gone_tags)
 
536
 
 
537
    def _handleTime(self, offset, line):
 
538
        # Accept it, but do not do anything with it yet.
 
539
        try:
 
540
            event_time = iso8601.parse_date(line[offset:-1])
 
541
        except TypeError:
 
542
            raise TypeError(_u("Failed to parse %r, got %r")
 
543
                % (line, sys.exec_info[1]))
 
544
        self.client.time(event_time)
 
545
 
 
546
    def lineReceived(self, line):
 
547
        """Call the appropriate local method for the received line."""
 
548
        self._state.lineReceived(line)
 
549
 
 
550
    def _lostConnectionInTest(self, state_string):
 
551
        error_string = _u("lost connection during %stest '%s'") % (
 
552
            state_string, self.current_test_description)
 
553
        self.client.addError(self._current_test, RemoteError(error_string))
 
554
        self.client.stopTest(self._current_test)
 
555
 
 
556
    def lostConnection(self):
 
557
        """The input connection has finished."""
 
558
        self._state.lostConnection()
 
559
 
 
560
    def readFrom(self, pipe):
 
561
        """Blocking convenience API to parse an entire stream.
 
562
 
 
563
        :param pipe: A file-like object supporting readlines().
 
564
        :return: None.
 
565
        """
 
566
        for line in pipe.readlines():
 
567
            self.lineReceived(line)
 
568
        self.lostConnection()
 
569
 
 
570
    def _startTest(self, offset, line):
 
571
        """Internal call to change state machine. Override startTest()."""
 
572
        self._state.startTest(offset, line)
 
573
 
 
574
    def subunitLineReceived(self, line):
 
575
        self._forward_stream.write(line)
 
576
 
 
577
    def stdOutLineReceived(self, line):
 
578
        self._stream.write(line)
 
579
 
 
580
 
 
581
class TestProtocolClient(testresult.TestResult):
 
582
    """A TestResult which generates a subunit stream for a test run.
 
583
 
 
584
    # Get a TestSuite or TestCase to run
 
585
    suite = make_suite()
 
586
    # Create a stream (any object with a 'write' method). This should accept
 
587
    # bytes not strings: subunit is a byte orientated protocol.
 
588
    stream = file('tests.log', 'wb')
 
589
    # Create a subunit result object which will output to the stream
 
590
    result = subunit.TestProtocolClient(stream)
 
591
    # Optionally, to get timing data for performance analysis, wrap the
 
592
    # serialiser with a timing decorator
 
593
    result = subunit.test_results.AutoTimingTestResultDecorator(result)
 
594
    # Run the test suite reporting to the subunit result object
 
595
    suite.run(result)
 
596
    # Close the stream.
 
597
    stream.close()
 
598
    """
 
599
 
 
600
    def __init__(self, stream):
 
601
        testresult.TestResult.__init__(self)
 
602
        self._stream = stream
 
603
        _make_stream_binary(stream)
 
604
        self._progress_fmt = _b("progress: ")
 
605
        self._bytes_eol = _b("\n")
 
606
        self._progress_plus = _b("+")
 
607
        self._progress_push = _b("push")
 
608
        self._progress_pop = _b("pop")
 
609
        self._empty_bytes = _b("")
 
610
        self._start_simple = _b(" [\n")
 
611
        self._end_simple = _b("]\n")
 
612
 
 
613
    def addError(self, test, error=None, details=None):
 
614
        """Report an error in test test.
 
615
 
 
616
        Only one of error and details should be provided: conceptually there
 
617
        are two separate methods:
 
618
            addError(self, test, error)
 
619
            addError(self, test, details)
 
620
 
 
621
        :param error: Standard unittest positional argument form - an
 
622
            exc_info tuple.
 
623
        :param details: New Testing-in-python drafted API; a dict from string
 
624
            to subunit.Content objects.
 
625
        """
 
626
        self._addOutcome("error", test, error=error, details=details)
 
627
 
 
628
    def addExpectedFailure(self, test, error=None, details=None):
 
629
        """Report an expected failure in test test.
 
630
 
 
631
        Only one of error and details should be provided: conceptually there
 
632
        are two separate methods:
 
633
            addError(self, test, error)
 
634
            addError(self, test, details)
 
635
 
 
636
        :param error: Standard unittest positional argument form - an
 
637
            exc_info tuple.
 
638
        :param details: New Testing-in-python drafted API; a dict from string
 
639
            to subunit.Content objects.
 
640
        """
 
641
        self._addOutcome("xfail", test, error=error, details=details)
 
642
 
 
643
    def addFailure(self, test, error=None, details=None):
 
644
        """Report a failure in test test.
 
645
 
 
646
        Only one of error and details should be provided: conceptually there
 
647
        are two separate methods:
 
648
            addFailure(self, test, error)
 
649
            addFailure(self, test, details)
 
650
 
 
651
        :param error: Standard unittest positional argument form - an
 
652
            exc_info tuple.
 
653
        :param details: New Testing-in-python drafted API; a dict from string
 
654
            to subunit.Content objects.
 
655
        """
 
656
        self._addOutcome("failure", test, error=error, details=details)
 
657
 
 
658
    def _addOutcome(self, outcome, test, error=None, details=None,
 
659
        error_permitted=True):
 
660
        """Report a failure in test test.
 
661
 
 
662
        Only one of error and details should be provided: conceptually there
 
663
        are two separate methods:
 
664
            addOutcome(self, test, error)
 
665
            addOutcome(self, test, details)
 
666
 
 
667
        :param outcome: A string describing the outcome - used as the
 
668
            event name in the subunit stream.
 
669
        :param error: Standard unittest positional argument form - an
 
670
            exc_info tuple.
 
671
        :param details: New Testing-in-python drafted API; a dict from string
 
672
            to subunit.Content objects.
 
673
        :param error_permitted: If True then one and only one of error or
 
674
            details must be supplied. If False then error must not be supplied
 
675
            and details is still optional.  """
 
676
        self._stream.write(_b("%s: %s" % (outcome, test.id())))
 
677
        if error_permitted:
 
678
            if error is None and details is None:
 
679
                raise ValueError
 
680
        else:
 
681
            if error is not None:
 
682
                raise ValueError
 
683
        if error is not None:
 
684
            self._stream.write(self._start_simple)
 
685
            # XXX: this needs to be made much stricter, along the lines of
 
686
            # Martin[gz]'s work in testtools. Perhaps subunit can use that?
 
687
            for line in self._exc_info_to_unicode(error, test).splitlines():
 
688
                self._stream.write(("%s\n" % line).encode('utf8'))
 
689
        elif details is not None:
 
690
            self._write_details(details)
 
691
        else:
 
692
            self._stream.write(_b("\n"))
 
693
        if details is not None or error is not None:
 
694
            self._stream.write(self._end_simple)
 
695
 
 
696
    def addSkip(self, test, reason=None, details=None):
 
697
        """Report a skipped test."""
 
698
        if reason is None:
 
699
            self._addOutcome("skip", test, error=None, details=details)
 
700
        else:
 
701
            self._stream.write(_b("skip: %s [\n" % test.id()))
 
702
            self._stream.write(_b("%s\n" % reason))
 
703
            self._stream.write(self._end_simple)
 
704
 
 
705
    def addSuccess(self, test, details=None):
 
706
        """Report a success in a test."""
 
707
        self._addOutcome("successful", test, details=details, error_permitted=False)
 
708
 
 
709
    def addUnexpectedSuccess(self, test, details=None):
 
710
        """Report an unexpected success in test test.
 
711
 
 
712
        Details can optionally be provided: conceptually there
 
713
        are two separate methods:
 
714
            addError(self, test)
 
715
            addError(self, test, details)
 
716
 
 
717
        :param details: New Testing-in-python drafted API; a dict from string
 
718
            to subunit.Content objects.
 
719
        """
 
720
        self._addOutcome("uxsuccess", test, details=details,
 
721
            error_permitted=False)
 
722
 
 
723
    def startTest(self, test):
 
724
        """Mark a test as starting its test run."""
 
725
        super(TestProtocolClient, self).startTest(test)
 
726
        self._stream.write(_b("test: %s\n" % test.id()))
 
727
        self._stream.flush()
 
728
 
 
729
    def stopTest(self, test):
 
730
        super(TestProtocolClient, self).stopTest(test)
 
731
        self._stream.flush()
 
732
 
 
733
    def progress(self, offset, whence):
 
734
        """Provide indication about the progress/length of the test run.
 
735
 
 
736
        :param offset: Information about the number of tests remaining. If
 
737
            whence is PROGRESS_CUR, then offset increases/decreases the
 
738
            remaining test count. If whence is PROGRESS_SET, then offset
 
739
            specifies exactly the remaining test count.
 
740
        :param whence: One of PROGRESS_CUR, PROGRESS_SET, PROGRESS_PUSH,
 
741
            PROGRESS_POP.
 
742
        """
 
743
        if whence == PROGRESS_CUR and offset > -1:
 
744
            prefix = self._progress_plus
 
745
            offset = _b(str(offset))
 
746
        elif whence == PROGRESS_PUSH:
 
747
            prefix = self._empty_bytes
 
748
            offset = self._progress_push
 
749
        elif whence == PROGRESS_POP:
 
750
            prefix = self._empty_bytes
 
751
            offset = self._progress_pop
 
752
        else:
 
753
            prefix = self._empty_bytes
 
754
            offset = _b(str(offset))
 
755
        self._stream.write(self._progress_fmt + prefix + offset +
 
756
            self._bytes_eol)
 
757
 
 
758
    def time(self, a_datetime):
 
759
        """Inform the client of the time.
 
760
 
 
761
        ":param datetime: A datetime.datetime object.
 
762
        """
 
763
        time = a_datetime.astimezone(iso8601.Utc())
 
764
        self._stream.write(_b("time: %04d-%02d-%02d %02d:%02d:%02d.%06dZ\n" % (
 
765
            time.year, time.month, time.day, time.hour, time.minute,
 
766
            time.second, time.microsecond)))
 
767
 
 
768
    def _write_details(self, details):
 
769
        """Output details to the stream.
 
770
 
 
771
        :param details: An extended details dict for a test outcome.
 
772
        """
 
773
        self._stream.write(_b(" [ multipart\n"))
 
774
        for name, content in sorted(details.items()):
 
775
            self._stream.write(_b("Content-Type: %s/%s" %
 
776
                (content.content_type.type, content.content_type.subtype)))
 
777
            parameters = content.content_type.parameters
 
778
            if parameters:
 
779
                self._stream.write(_b(";"))
 
780
                param_strs = []
 
781
                for param, value in parameters.items():
 
782
                    param_strs.append("%s=%s" % (param, value))
 
783
                self._stream.write(_b(",".join(param_strs)))
 
784
            self._stream.write(_b("\n%s\n" % name))
 
785
            encoder = chunked.Encoder(self._stream)
 
786
            list(map(encoder.write, content.iter_bytes()))
 
787
            encoder.close()
 
788
 
 
789
    def done(self):
 
790
        """Obey the testtools result.done() interface."""
 
791
 
 
792
 
 
793
def RemoteError(description=_u("")):
 
794
    return (_StringException, _StringException(description), None)
 
795
 
 
796
 
 
797
class RemotedTestCase(unittest.TestCase):
 
798
    """A class to represent test cases run in child processes.
 
799
 
 
800
    Instances of this class are used to provide the Python test API a TestCase
 
801
    that can be printed to the screen, introspected for metadata and so on.
 
802
    However, as they are a simply a memoisation of a test that was actually
 
803
    run in the past by a separate process, they cannot perform any interactive
 
804
    actions.
 
805
    """
 
806
 
 
807
    def __eq__ (self, other):
 
808
        try:
 
809
            return self.__description == other.__description
 
810
        except AttributeError:
 
811
            return False
 
812
 
 
813
    def __init__(self, description):
 
814
        """Create a psuedo test case with description description."""
 
815
        self.__description = description
 
816
 
 
817
    def error(self, label):
 
818
        raise NotImplementedError("%s on RemotedTestCases is not permitted." %
 
819
            label)
 
820
 
 
821
    def setUp(self):
 
822
        self.error("setUp")
 
823
 
 
824
    def tearDown(self):
 
825
        self.error("tearDown")
 
826
 
 
827
    def shortDescription(self):
 
828
        return self.__description
 
829
 
 
830
    def id(self):
 
831
        return "%s" % (self.__description,)
 
832
 
 
833
    def __str__(self):
 
834
        return "%s (%s)" % (self.__description, self._strclass())
 
835
 
 
836
    def __repr__(self):
 
837
        return "<%s description='%s'>" % \
 
838
               (self._strclass(), self.__description)
 
839
 
 
840
    def run(self, result=None):
 
841
        if result is None: result = self.defaultTestResult()
 
842
        result.startTest(self)
 
843
        result.addError(self, RemoteError(_u("Cannot run RemotedTestCases.\n")))
 
844
        result.stopTest(self)
 
845
 
 
846
    def _strclass(self):
 
847
        cls = self.__class__
 
848
        return "%s.%s" % (cls.__module__, cls.__name__)
 
849
 
 
850
 
 
851
class ExecTestCase(unittest.TestCase):
 
852
    """A test case which runs external scripts for test fixtures."""
 
853
 
 
854
    def __init__(self, methodName='runTest'):
 
855
        """Create an instance of the class that will use the named test
 
856
           method when executed. Raises a ValueError if the instance does
 
857
           not have a method with the specified name.
 
858
        """
 
859
        unittest.TestCase.__init__(self, methodName)
 
860
        testMethod = getattr(self, methodName)
 
861
        self.script = join_dir(sys.modules[self.__class__.__module__].__file__,
 
862
                               testMethod.__doc__)
 
863
 
 
864
    def countTestCases(self):
 
865
        return 1
 
866
 
 
867
    def run(self, result=None):
 
868
        if result is None: result = self.defaultTestResult()
 
869
        self._run(result)
 
870
 
 
871
    def debug(self):
 
872
        """Run the test without collecting errors in a TestResult"""
 
873
        self._run(testresult.TestResult())
 
874
 
 
875
    def _run(self, result):
 
876
        protocol = TestProtocolServer(result)
 
877
        process = subprocess.Popen(self.script, shell=True,
 
878
            stdout=subprocess.PIPE)
 
879
        _make_stream_binary(process.stdout)
 
880
        output = process.communicate()[0]
 
881
        protocol.readFrom(BytesIO(output))
 
882
 
 
883
 
 
884
class IsolatedTestCase(unittest.TestCase):
 
885
    """A TestCase which executes in a forked process.
 
886
 
 
887
    Each test gets its own process, which has a performance overhead but will
 
888
    provide excellent isolation from global state (such as django configs,
 
889
    zope utilities and so on).
 
890
    """
 
891
 
 
892
    def run(self, result=None):
 
893
        if result is None: result = self.defaultTestResult()
 
894
        run_isolated(unittest.TestCase, self, result)
 
895
 
 
896
 
 
897
class IsolatedTestSuite(unittest.TestSuite):
 
898
    """A TestSuite which runs its tests in a forked process.
 
899
 
 
900
    This decorator that will fork() before running the tests and report the
 
901
    results from the child process using a Subunit stream.  This is useful for
 
902
    handling tests that mutate global state, or are testing C extensions that
 
903
    could crash the VM.
 
904
    """
 
905
 
 
906
    def run(self, result=None):
 
907
        if result is None: result = testresult.TestResult()
 
908
        run_isolated(unittest.TestSuite, self, result)
 
909
 
 
910
 
 
911
def run_isolated(klass, self, result):
 
912
    """Run a test suite or case in a subprocess, using the run method on klass.
 
913
    """
 
914
    c2pread, c2pwrite = os.pipe()
 
915
    # fixme - error -> result
 
916
    # now fork
 
917
    pid = os.fork()
 
918
    if pid == 0:
 
919
        # Child
 
920
        # Close parent's pipe ends
 
921
        os.close(c2pread)
 
922
        # Dup fds for child
 
923
        os.dup2(c2pwrite, 1)
 
924
        # Close pipe fds.
 
925
        os.close(c2pwrite)
 
926
 
 
927
        # at this point, sys.stdin is redirected, now we want
 
928
        # to filter it to escape ]'s.
 
929
        ### XXX: test and write that bit.
 
930
        stream = os.fdopen(1, 'wb')
 
931
        result = TestProtocolClient(stream)
 
932
        klass.run(self, result)
 
933
        stream.flush()
 
934
        sys.stderr.flush()
 
935
        # exit HARD, exit NOW.
 
936
        os._exit(0)
 
937
    else:
 
938
        # Parent
 
939
        # Close child pipe ends
 
940
        os.close(c2pwrite)
 
941
        # hookup a protocol engine
 
942
        protocol = TestProtocolServer(result)
 
943
        fileobj = os.fdopen(c2pread, 'rb')
 
944
        protocol.readFrom(fileobj)
 
945
        os.waitpid(pid, 0)
 
946
        # TODO return code evaluation.
 
947
    return result
 
948
 
 
949
 
 
950
def TAP2SubUnit(tap, subunit):
 
951
    """Filter a TAP pipe into a subunit pipe.
 
952
 
 
953
    :param tap: A tap pipe/stream/file object.
 
954
    :param subunit: A pipe/stream/file object to write subunit results to.
 
955
    :return: The exit code to exit with.
 
956
    """
 
957
    BEFORE_PLAN = 0
 
958
    AFTER_PLAN = 1
 
959
    SKIP_STREAM = 2
 
960
    state = BEFORE_PLAN
 
961
    plan_start = 1
 
962
    plan_stop = 0
 
963
    def _skipped_test(subunit, plan_start):
 
964
        # Some tests were skipped.
 
965
        subunit.write('test test %d\n' % plan_start)
 
966
        subunit.write('error test %d [\n' % plan_start)
 
967
        subunit.write('test missing from TAP output\n')
 
968
        subunit.write(']\n')
 
969
        return plan_start + 1
 
970
    # Test data for the next test to emit
 
971
    test_name = None
 
972
    log = []
 
973
    result = None
 
974
    def _emit_test():
 
975
        "write out a test"
 
976
        if test_name is None:
 
977
            return
 
978
        subunit.write("test %s\n" % test_name)
 
979
        if not log:
 
980
            subunit.write("%s %s\n" % (result, test_name))
 
981
        else:
 
982
            subunit.write("%s %s [\n" % (result, test_name))
 
983
        if log:
 
984
            for line in log:
 
985
                subunit.write("%s\n" % line)
 
986
            subunit.write("]\n")
 
987
        del log[:]
 
988
    for line in tap:
 
989
        if state == BEFORE_PLAN:
 
990
            match = re.match("(\d+)\.\.(\d+)\s*(?:\#\s+(.*))?\n", line)
 
991
            if match:
 
992
                state = AFTER_PLAN
 
993
                _, plan_stop, comment = match.groups()
 
994
                plan_stop = int(plan_stop)
 
995
                if plan_start > plan_stop and plan_stop == 0:
 
996
                    # skipped file
 
997
                    state = SKIP_STREAM
 
998
                    subunit.write("test file skip\n")
 
999
                    subunit.write("skip file skip [\n")
 
1000
                    subunit.write("%s\n" % comment)
 
1001
                    subunit.write("]\n")
 
1002
                continue
 
1003
        # not a plan line, or have seen one before
 
1004
        match = re.match("(ok|not ok)(?:\s+(\d+)?)?(?:\s+([^#]*[^#\s]+)\s*)?(?:\s+#\s+(TODO|SKIP|skip|todo)(?:\s+(.*))?)?\n", line)
 
1005
        if match:
 
1006
            # new test, emit current one.
 
1007
            _emit_test()
 
1008
            status, number, description, directive, directive_comment = match.groups()
 
1009
            if status == 'ok':
 
1010
                result = 'success'
 
1011
            else:
 
1012
                result = "failure"
 
1013
            if description is None:
 
1014
                description = ''
 
1015
            else:
 
1016
                description = ' ' + description
 
1017
            if directive is not None:
 
1018
                if directive.upper() == 'TODO':
 
1019
                    result = 'xfail'
 
1020
                elif directive.upper() == 'SKIP':
 
1021
                    result = 'skip'
 
1022
                if directive_comment is not None:
 
1023
                    log.append(directive_comment)
 
1024
            if number is not None:
 
1025
                number = int(number)
 
1026
                while plan_start < number:
 
1027
                    plan_start = _skipped_test(subunit, plan_start)
 
1028
            test_name = "test %d%s" % (plan_start, description)
 
1029
            plan_start += 1
 
1030
            continue
 
1031
        match = re.match("Bail out\!(?:\s*(.*))?\n", line)
 
1032
        if match:
 
1033
            reason, = match.groups()
 
1034
            if reason is None:
 
1035
                extra = ''
 
1036
            else:
 
1037
                extra = ' %s' % reason
 
1038
            _emit_test()
 
1039
            test_name = "Bail out!%s" % extra
 
1040
            result = "error"
 
1041
            state = SKIP_STREAM
 
1042
            continue
 
1043
        match = re.match("\#.*\n", line)
 
1044
        if match:
 
1045
            log.append(line[:-1])
 
1046
            continue
 
1047
        subunit.write(line)
 
1048
    _emit_test()
 
1049
    while plan_start <= plan_stop:
 
1050
        # record missed tests
 
1051
        plan_start = _skipped_test(subunit, plan_start)
 
1052
    return 0
 
1053
 
 
1054
 
 
1055
def tag_stream(original, filtered, tags):
 
1056
    """Alter tags on a stream.
 
1057
 
 
1058
    :param original: The input stream.
 
1059
    :param filtered: The output stream.
 
1060
    :param tags: The tags to apply. As in a normal stream - a list of 'TAG' or
 
1061
        '-TAG' commands.
 
1062
 
 
1063
        A 'TAG' command will add the tag to the output stream,
 
1064
        and override any existing '-TAG' command in that stream.
 
1065
        Specifically:
 
1066
         * A global 'tags: TAG' will be added to the start of the stream.
 
1067
         * Any tags commands with -TAG will have the -TAG removed.
 
1068
 
 
1069
        A '-TAG' command will remove the TAG command from the stream.
 
1070
        Specifically:
 
1071
         * A 'tags: -TAG' command will be added to the start of the stream.
 
1072
         * Any 'tags: TAG' command will have 'TAG' removed from it.
 
1073
        Additionally, any redundant tagging commands (adding a tag globally
 
1074
        present, or removing a tag globally removed) are stripped as a
 
1075
        by-product of the filtering.
 
1076
    :return: 0
 
1077
    """
 
1078
    new_tags, gone_tags = tags_to_new_gone(tags)
 
1079
    def write_tags(new_tags, gone_tags):
 
1080
        if new_tags or gone_tags:
 
1081
            filtered.write("tags: " + ' '.join(new_tags))
 
1082
            if gone_tags:
 
1083
                for tag in gone_tags:
 
1084
                    filtered.write("-" + tag)
 
1085
            filtered.write("\n")
 
1086
    write_tags(new_tags, gone_tags)
 
1087
    # TODO: use the protocol parser and thus don't mangle test comments.
 
1088
    for line in original:
 
1089
        if line.startswith("tags:"):
 
1090
            line_tags = line[5:].split()
 
1091
            line_new, line_gone = tags_to_new_gone(line_tags)
 
1092
            line_new = line_new - gone_tags
 
1093
            line_gone = line_gone - new_tags
 
1094
            write_tags(line_new, line_gone)
 
1095
        else:
 
1096
            filtered.write(line)
 
1097
    return 0
 
1098
 
 
1099
 
 
1100
class ProtocolTestCase(object):
 
1101
    """Subunit wire protocol to unittest.TestCase adapter.
 
1102
 
 
1103
    ProtocolTestCase honours the core of ``unittest.TestCase`` protocol -
 
1104
    calling a ProtocolTestCase or invoking the run() method will make a 'test
 
1105
    run' happen. The 'test run' will simply be a replay of the test activity
 
1106
    that has been encoded into the stream. The ``unittest.TestCase`` ``debug``
 
1107
    and ``countTestCases`` methods are not supported because there isn't a
 
1108
    sensible mapping for those methods.
 
1109
 
 
1110
    # Get a stream (any object with a readline() method), in this case the
 
1111
    # stream output by the example from ``subunit.TestProtocolClient``.
 
1112
    stream = file('tests.log', 'rb')
 
1113
    # Create a parser which will read from the stream and emit
 
1114
    # activity to a unittest.TestResult when run() is called.
 
1115
    suite = subunit.ProtocolTestCase(stream)
 
1116
    # Create a result object to accept the contents of that stream.
 
1117
    result = unittest._TextTestResult(sys.stdout)
 
1118
    # 'run' the tests - process the stream and feed its contents to result.
 
1119
    suite.run(result)
 
1120
    stream.close()
 
1121
 
 
1122
    :seealso: TestProtocolServer (the subunit wire protocol parser).
 
1123
    """
 
1124
 
 
1125
    def __init__(self, stream, passthrough=None, forward=False):
 
1126
        """Create a ProtocolTestCase reading from stream.
 
1127
 
 
1128
        :param stream: A filelike object which a subunit stream can be read
 
1129
            from.
 
1130
        :param passthrough: A stream pass non subunit input on to. If not
 
1131
            supplied, the TestProtocolServer default is used.
 
1132
        :param forward: A stream to pass subunit input on to. If not supplied
 
1133
            subunit input is not forwarded.
 
1134
        """
 
1135
        self._stream = stream
 
1136
        _make_stream_binary(stream)
 
1137
        self._passthrough = passthrough
 
1138
        self._forward = forward
 
1139
 
 
1140
    def __call__(self, result=None):
 
1141
        return self.run(result)
 
1142
 
 
1143
    def run(self, result=None):
 
1144
        if result is None:
 
1145
            result = self.defaultTestResult()
 
1146
        protocol = TestProtocolServer(result, self._passthrough, self._forward)
 
1147
        line = self._stream.readline()
 
1148
        while line:
 
1149
            protocol.lineReceived(line)
 
1150
            line = self._stream.readline()
 
1151
        protocol.lostConnection()
 
1152
 
 
1153
 
 
1154
class TestResultStats(testresult.TestResult):
 
1155
    """A pyunit TestResult interface implementation for making statistics.
 
1156
 
 
1157
    :ivar total_tests: The total tests seen.
 
1158
    :ivar passed_tests: The tests that passed.
 
1159
    :ivar failed_tests: The tests that failed.
 
1160
    :ivar seen_tags: The tags seen across all tests.
 
1161
    """
 
1162
 
 
1163
    def __init__(self, stream):
 
1164
        """Create a TestResultStats which outputs to stream."""
 
1165
        testresult.TestResult.__init__(self)
 
1166
        self._stream = stream
 
1167
        self.failed_tests = 0
 
1168
        self.skipped_tests = 0
 
1169
        self.seen_tags = set()
 
1170
 
 
1171
    @property
 
1172
    def total_tests(self):
 
1173
        return self.testsRun
 
1174
 
 
1175
    def addError(self, test, err, details=None):
 
1176
        self.failed_tests += 1
 
1177
 
 
1178
    def addFailure(self, test, err, details=None):
 
1179
        self.failed_tests += 1
 
1180
 
 
1181
    def addSkip(self, test, reason, details=None):
 
1182
        self.skipped_tests += 1
 
1183
 
 
1184
    def formatStats(self):
 
1185
        self._stream.write("Total tests:   %5d\n" % self.total_tests)
 
1186
        self._stream.write("Passed tests:  %5d\n" % self.passed_tests)
 
1187
        self._stream.write("Failed tests:  %5d\n" % self.failed_tests)
 
1188
        self._stream.write("Skipped tests: %5d\n" % self.skipped_tests)
 
1189
        tags = sorted(self.seen_tags)
 
1190
        self._stream.write("Seen tags: %s\n" % (", ".join(tags)))
 
1191
 
 
1192
    @property
 
1193
    def passed_tests(self):
 
1194
        return self.total_tests - self.failed_tests - self.skipped_tests
 
1195
 
 
1196
    def tags(self, new_tags, gone_tags):
 
1197
        """Accumulate the seen tags."""
 
1198
        self.seen_tags.update(new_tags)
 
1199
 
 
1200
    def wasSuccessful(self):
 
1201
        """Tells whether or not this result was a success"""
 
1202
        return self.failed_tests == 0
 
1203
 
 
1204
 
 
1205
def get_default_formatter():
 
1206
    """Obtain the default formatter to write to.
 
1207
 
 
1208
    :return: A file-like object.
 
1209
    """
 
1210
    formatter = os.getenv("SUBUNIT_FORMATTER")
 
1211
    if formatter:
 
1212
        return os.popen(formatter, "w")
 
1213
    else:
 
1214
        stream = sys.stdout
 
1215
        if sys.version_info > (3, 0):
 
1216
            stream = stream.buffer
 
1217
        return stream
 
1218
 
 
1219
 
 
1220
if sys.version_info > (3, 0):
 
1221
    from io import UnsupportedOperation as _NoFilenoError
 
1222
else:
 
1223
    _NoFilenoError = AttributeError
 
1224
 
 
1225
def read_test_list(path):
 
1226
    """Read a list of test ids from a file on disk.
 
1227
 
 
1228
    :param path: Path to the file
 
1229
    :return: Sequence of test ids
 
1230
    """
 
1231
    f = open(path, 'rb')
 
1232
    try:
 
1233
        return [l.rstrip("\n") for l in f.readlines()]
 
1234
    finally:
 
1235
        f.close()
 
1236
 
 
1237
 
 
1238
def _make_stream_binary(stream):
 
1239
    """Ensure that a stream will be binary safe. See _make_binary_on_windows."""
 
1240
    try:
 
1241
        fileno = stream.fileno()
 
1242
    except _NoFilenoError:
 
1243
        return
 
1244
    _make_binary_on_windows(fileno)
 
1245
 
 
1246
def _make_binary_on_windows(fileno):
 
1247
    """Win32 mangles \r\n to \n and that breaks streams. See bug lp:505078."""
 
1248
    if sys.platform == "win32":
 
1249
        import msvcrt
 
1250
        msvcrt.setmode(fileno, os.O_BINARY)