~lifeless/subunit/streamresult

« back to all changes in this revision

Viewing changes to python/subunit/tests/test_subunit_filter.py

  • Committer: Robert Collins
  • Date: 2013-02-26 09:08:32 UTC
  • Revision ID: robertc@robertcollins.net-20130226090832-8bhycg3ecxqqta6l
Update subunit-filter to consume and emit v2.

Show diffs side-by-side

added added

removed removed

Lines of Context:
25
25
 
26
26
from testtools import TestCase
27
27
from testtools.compat import _b, BytesIO
28
 
from testtools.testresult.doubles import ExtendedTestResult
 
28
from testtools.testresult.doubles import ExtendedTestResult, StreamResult
29
29
 
30
30
import subunit
31
31
from subunit.test_results import make_tag_filter, TestResultFilter
 
32
from subunit import ByteStreamToStreamResult, StreamResultToBytes
32
33
 
33
34
 
34
35
class TestTestResultFilter(TestCase):
286
287
 
287
288
class TestFilterCommand(TestCase):
288
289
 
289
 
    example_subunit_stream = _b("""\
290
 
tags: global
291
 
test passed
292
 
success passed
293
 
test failed
294
 
tags: local
295
 
failure failed
296
 
test error
297
 
error error [
298
 
error details
299
 
]
300
 
test skipped
301
 
skip skipped
302
 
test todo
303
 
xfail todo
304
 
""")
305
 
 
306
290
    def run_command(self, args, stream):
307
291
        root = os.path.dirname(
308
292
            os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
316
300
            raise RuntimeError("%s failed: %s" % (command, err))
317
301
        return out
318
302
 
319
 
    def to_events(self, stream):
320
 
        test = subunit.ProtocolTestCase(BytesIO(stream))
321
 
        result = ExtendedTestResult()
322
 
        test.run(result)
323
 
        return result._events
324
 
 
325
303
    def test_default(self):
326
 
        output = self.run_command([], _b(
327
 
                "test: foo\n"
328
 
                "skip: foo\n"
329
 
                ))
330
 
        events = self.to_events(output)
331
 
        foo = subunit.RemotedTestCase('foo')
332
 
        self.assertEqual(
333
 
            [('startTest', foo),
334
 
             ('addSkip', foo, {}),
335
 
             ('stopTest', foo)],
336
 
            events)
 
304
        byte_stream = BytesIO()
 
305
        stream = StreamResultToBytes(byte_stream)
 
306
        stream.status(test_id="foo", test_status="inprogress")
 
307
        stream.status(test_id="foo", test_status="skip")
 
308
        output = self.run_command([], byte_stream.getvalue())
 
309
        events = StreamResult()
 
310
        ByteStreamToStreamResult(BytesIO(output)).run(events)
 
311
        ids = set(event[1] for event in events._events)
 
312
        self.assertEqual([
 
313
            ('status', 'foo', 'inprogress'),
 
314
            ('status', 'foo', 'skip'),
 
315
            ], [event[:3] for event in events._events])
337
316
 
338
317
    def test_tags(self):
339
 
        output = self.run_command(['-s', '--with-tag', 'a'], _b(
340
 
                "tags: a\n"
341
 
                "test: foo\n"
342
 
                "success: foo\n"
343
 
                "tags: -a\n"
344
 
                "test: bar\n"
345
 
                "success: bar\n"
346
 
                "test: baz\n"
347
 
                "tags: a\n"
348
 
                "success: baz\n"
349
 
                ))
350
 
        events = self.to_events(output)
351
 
        foo = subunit.RemotedTestCase('foo')
352
 
        baz = subunit.RemotedTestCase('baz')
353
 
        self.assertEqual(
354
 
            [('tags', set(['a']), set()),
355
 
             ('startTest', foo),
356
 
             ('addSuccess', foo),
357
 
             ('stopTest', foo),
358
 
             ('tags', set(), set(['a'])),
359
 
             ('startTest', baz),
360
 
             ('tags', set(['a']), set()),
361
 
             ('addSuccess', baz),
362
 
             ('stopTest', baz),
363
 
             ],
364
 
            events)
 
318
        byte_stream = BytesIO()
 
319
        stream = StreamResultToBytes(byte_stream)
 
320
        stream.status(
 
321
            test_id="foo", test_status="inprogress", test_tags=set(["a"]))
 
322
        stream.status(
 
323
            test_id="foo", test_status="success", test_tags=set(["a"]))
 
324
        stream.status(test_id="bar", test_status="inprogress")
 
325
        stream.status(test_id="bar", test_status="inprogress")
 
326
        stream.status(
 
327
            test_id="baz", test_status="inprogress", test_tags=set(["a"]))
 
328
        stream.status(
 
329
            test_id="baz", test_status="success", test_tags=set(["a"]))
 
330
        output = self.run_command(
 
331
            ['-s', '--with-tag', 'a'], byte_stream.getvalue())
 
332
        events = StreamResult()
 
333
        ByteStreamToStreamResult(BytesIO(output)).run(events)
 
334
        ids = set(event[1] for event in events._events)
 
335
        self.assertEqual(set(['foo', 'baz']), ids)
 
336
 
 
337
    def test_no_passthrough(self):
 
338
        output = self.run_command(['--no-passthrough'], b'hi thar')
 
339
        self.assertEqual(b'', output)
 
340
 
 
341
    def test_passthrough(self):
 
342
        output = self.run_command([], b'hi thar')
 
343
        byte_stream = BytesIO()
 
344
        stream = StreamResultToBytes(byte_stream)
 
345
        for pos, _ in enumerate(b'hi thar'):
 
346
            stream.status(file_name="stdout", file_bytes=b'hi thar'[pos:pos+1])
 
347
        self.assertEqual(byte_stream.getvalue(), output)
365
348
 
366
349
 
367
350
def test_suite():