122
from StringIO import StringIO
124
from io import StringIO
125
121
import subprocess
129
from subunit import iso8601
130
from testtools.compat import _b, _u
131
from testtools import ExtendedToOriginalDecorator
125
from testtools import content, content_type, ExtendedToOriginalDecorator
126
from testtools.compat import _b, _u, BytesIO, StringIO
133
128
from testtools.testresult.real import _StringException
134
129
RemoteException = _StringException
135
_remote_exception_str = '_StringException' # For testing.
130
# For testing: different pythons have different str() implementations.
131
if sys.version_info > (3, 0):
132
_remote_exception_str = "testtools.testresult.real._StringException"
133
_remote_exception_str_chunked = "34\r\n" + _remote_exception_str
135
_remote_exception_str = "_StringException"
136
_remote_exception_str_chunked = "1A\r\n" + _remote_exception_str
136
137
except ImportError:
137
138
raise ImportError ("testtools.testresult.real does not contain "
138
139
"_StringException, check your version.")
139
from testtools import content, content_type, testresult
140
from testtools import testresult
141
from subunit import chunked, details
142
from subunit import chunked, details, iso8601, test_results
191
192
def __init__(self, parser):
192
193
self.parser = parser
194
self._test_sym = (_b('test'), _b('testing'))
195
self._colon_sym = _b(':')
196
self._error_sym = (_b('error'),)
197
self._failure_sym = (_b('failure'),)
198
self._progress_sym = (_b('progress'),)
199
self._skip_sym = _b('skip')
200
self._success_sym = (_b('success'), _b('successful'))
201
self._tags_sym = (_b('tags'),)
202
self._time_sym = (_b('time'),)
203
self._xfail_sym = (_b('xfail'),)
204
self._uxsuccess_sym = (_b('uxsuccess'),)
205
self._start_simple = _u(" [")
206
self._start_multipart = _u(" [ multipart")
194
208
def addError(self, offset, line):
195
209
"""An 'error:' directive has been read."""
217
231
if len(parts) == 2 and line.startswith(parts[0]):
218
232
cmd, rest = parts
219
233
offset = len(cmd) + 1
220
cmd = cmd.rstrip(':')
221
if cmd in ('test', 'testing'):
234
cmd = cmd.rstrip(self._colon_sym)
235
if cmd in self._test_sym:
222
236
self.startTest(offset, line)
237
elif cmd in self._error_sym:
224
238
self.addError(offset, line)
225
elif cmd == 'failure':
239
elif cmd in self._failure_sym:
226
240
self.addFailure(offset, line)
227
elif cmd == 'progress':
241
elif cmd in self._progress_sym:
228
242
self.parser._handleProgress(offset, line)
243
elif cmd in self._skip_sym:
230
244
self.addSkip(offset, line)
231
elif cmd in ('success', 'successful'):
245
elif cmd in self._success_sym:
232
246
self.addSuccess(offset, line)
233
elif cmd in ('tags',):
247
elif cmd in self._tags_sym:
234
248
self.parser._handleTags(offset, line)
235
249
self.parser.subunitLineReceived(line)
236
elif cmd in ('time',):
250
elif cmd in self._time_sym:
237
251
self.parser._handleTime(offset, line)
238
252
self.parser.subunitLineReceived(line)
253
elif cmd in self._xfail_sym:
240
254
self.addExpectedFail(offset, line)
255
elif cmd in self._uxsuccess_sym:
256
self.addUnexpectedSuccess(offset, line)
242
258
self.parser.stdOutLineReceived(line)
262
278
:param details_state: The state to switch to for details
263
279
processing of this outcome.
265
if self.parser.current_test_description == line[offset:-1]:
281
test_name = line[offset:-1].decode('utf8')
282
if self.parser.current_test_description == test_name:
266
283
self.parser._state = self.parser._outside_test
267
284
self.parser.current_test_description = None
269
286
self.parser.client.stopTest(self.parser._current_test)
270
287
self.parser._current_test = None
271
288
self.parser.subunitLineReceived(line)
272
elif self.parser.current_test_description + " [" == line[offset:-1]:
289
elif self.parser.current_test_description + self._start_simple == \
273
291
self.parser._state = details_state
274
292
details_state.set_simple()
275
293
self.parser.subunitLineReceived(line)
276
elif self.parser.current_test_description + " [ multipart" == \
294
elif self.parser.current_test_description + self._start_multipart == \
278
296
self.parser._state = details_state
279
297
details_state.set_multipart()
280
298
self.parser.subunitLineReceived(line)
299
317
self._outcome(offset, line, self._xfail,
300
318
self.parser._reading_xfail_details)
320
def _uxsuccess(self):
321
self.parser.client.addUnexpectedSuccess(self.parser._current_test)
323
def addUnexpectedSuccess(self, offset, line):
324
"""A 'uxsuccess:' directive has been read."""
325
self._outcome(offset, line, self._uxsuccess,
326
self.parser._reading_uxsuccess_details)
302
328
def _failure(self):
303
329
self.parser.client.addFailure(self.parser._current_test, details={})
337
363
def startTest(self, offset, line):
338
364
"""A test start command received."""
339
365
self.parser._state = self.parser._in_test
340
self.parser._current_test = RemotedTestCase(line[offset:-1])
341
self.parser.current_test_description = line[offset:-1]
366
test_name = line[offset:-1].decode('utf8')
367
self.parser._current_test = RemotedTestCase(test_name)
368
self.parser.current_test_description = test_name
342
369
self.parser.client.startTest(self.parser._current_test)
343
370
self.parser.subunitLineReceived(line)
463
503
self._reading_skip_details = _ReadingSkipDetails(self)
464
504
self._reading_success_details = _ReadingSuccessDetails(self)
465
505
self._reading_xfail_details = _ReadingExpectedFailureDetails(self)
506
self._reading_uxsuccess_details = _ReadingUnexpectedSuccessDetails(self)
466
507
# start with outside test.
467
508
self._state = self._outside_test
509
# Avoid casts on every call
510
self._plusminus = _b('+-')
511
self._push_sym = _b('push')
512
self._pop_sym = _b('pop')
469
514
def _handleProgress(self, offset, line):
470
515
"""Process a progress directive."""
471
516
line = line[offset:].strip()
517
if line[0] in self._plusminus:
473
518
whence = PROGRESS_CUR
474
519
delta = int(line)
520
elif line == self._push_sym:
476
521
whence = PROGRESS_PUSH
523
elif line == self._pop_sym:
479
524
whence = PROGRESS_POP
555
601
testresult.TestResult.__init__(self)
556
602
self._stream = stream
557
603
_make_stream_binary(stream)
604
self._progress_fmt = _b("progress: ")
605
self._bytes_eol = _b("\n")
606
self._progress_plus = _b("+")
607
self._progress_push = _b("push")
608
self._progress_pop = _b("pop")
609
self._empty_bytes = _b("")
610
self._start_simple = _b(" [\n")
611
self._end_simple = _b("]\n")
559
613
def addError(self, test, error=None, details=None):
560
614
"""Report an error in test test.
602
656
self._addOutcome("failure", test, error=error, details=details)
604
def _addOutcome(self, outcome, test, error=None, details=None):
658
def _addOutcome(self, outcome, test, error=None, details=None,
659
error_permitted=True):
605
660
"""Report a failure in test test.
607
662
Only one of error and details should be provided: conceptually there
616
671
:param details: New Testing-in-python drafted API; a dict from string
617
672
to subunit.Content objects.
619
self._stream.write("%s: %s" % (outcome, test.id()))
620
if error is None and details is None:
673
:param error_permitted: If True then one and only one of error or
674
details must be supplied. If False then error must not be supplied
675
and details is still optional. """
676
self._stream.write(_b("%s: %s" % (outcome, test.id())))
678
if error is None and details is None:
681
if error is not None:
622
683
if error is not None:
623
self._stream.write(" [\n")
684
self._stream.write(self._start_simple)
624
685
# XXX: this needs to be made much stricter, along the lines of
625
686
# Martin[gz]'s work in testtools. Perhaps subunit can use that?
626
687
for line in self._exc_info_to_unicode(error, test).splitlines():
627
688
self._stream.write(("%s\n" % line).encode('utf8'))
689
elif details is not None:
690
self._write_details(details)
629
self._write_details(details)
630
self._stream.write("]\n")
692
self._stream.write(_b("\n"))
693
if details is not None or error is not None:
694
self._stream.write(self._end_simple)
632
696
def addSkip(self, test, reason=None, details=None):
633
697
"""Report a skipped test."""
634
698
if reason is None:
635
699
self._addOutcome("skip", test, error=None, details=details)
637
self._stream.write("skip: %s [\n" % test.id())
638
self._stream.write("%s\n" % reason)
639
self._stream.write("]\n")
701
self._stream.write(_b("skip: %s [\n" % test.id()))
702
self._stream.write(_b("%s\n" % reason))
703
self._stream.write(self._end_simple)
641
705
def addSuccess(self, test, details=None):
642
706
"""Report a success in a test."""
643
self._stream.write("successful: %s" % test.id())
645
self._stream.write("\n")
647
self._write_details(details)
648
self._stream.write("]\n")
649
addUnexpectedSuccess = addSuccess
707
self._addOutcome("successful", test, details=details, error_permitted=False)
709
def addUnexpectedSuccess(self, test, details=None):
710
"""Report an unexpected success in test test.
712
Details can optionally be provided: conceptually there
713
are two separate methods:
715
addError(self, test, details)
717
:param details: New Testing-in-python drafted API; a dict from string
718
to subunit.Content objects.
720
self._addOutcome("uxsuccess", test, details=details,
721
error_permitted=False)
651
723
def startTest(self, test):
652
724
"""Mark a test as starting its test run."""
653
725
super(TestProtocolClient, self).startTest(test)
654
self._stream.write("test: %s\n" % test.id())
726
self._stream.write(_b("test: %s\n" % test.id()))
655
727
self._stream.flush()
657
729
def stopTest(self, test):
671
743
if whence == PROGRESS_CUR and offset > -1:
744
prefix = self._progress_plus
745
offset = _b(str(offset))
673
746
elif whence == PROGRESS_PUSH:
747
prefix = self._empty_bytes
748
offset = self._progress_push
676
749
elif whence == PROGRESS_POP:
750
prefix = self._empty_bytes
751
offset = self._progress_pop
681
self._stream.write("progress: %s%s\n" % (prefix, offset))
753
prefix = self._empty_bytes
754
offset = _b(str(offset))
755
self._stream.write(self._progress_fmt + prefix + offset +
683
758
def time(self, a_datetime):
684
759
"""Inform the client of the time.
686
761
":param datetime: A datetime.datetime object.
688
763
time = a_datetime.astimezone(iso8601.Utc())
689
self._stream.write("time: %04d-%02d-%02d %02d:%02d:%02d.%06dZ\n" % (
764
self._stream.write(_b("time: %04d-%02d-%02d %02d:%02d:%02d.%06dZ\n" % (
690
765
time.year, time.month, time.day, time.hour, time.minute,
691
time.second, time.microsecond))
766
time.second, time.microsecond)))
693
768
def _write_details(self, details):
694
769
"""Output details to the stream.
696
771
:param details: An extended details dict for a test outcome.
698
self._stream.write(" [ multipart\n")
699
for name, content in sorted(details.iteritems()):
700
self._stream.write("Content-Type: %s/%s" %
701
(content.content_type.type, content.content_type.subtype))
773
self._stream.write(_b(" [ multipart\n"))
774
for name, content in sorted(details.items()):
775
self._stream.write(_b("Content-Type: %s/%s" %
776
(content.content_type.type, content.content_type.subtype)))
702
777
parameters = content.content_type.parameters
704
self._stream.write(";")
779
self._stream.write(_b(";"))
706
for param, value in parameters.iteritems():
781
for param, value in parameters.items():
707
782
param_strs.append("%s=%s" % (param, value))
708
self._stream.write(",".join(param_strs))
709
self._stream.write("\n%s\n" % name)
783
self._stream.write(_b(",".join(param_strs)))
784
self._stream.write(_b("\n%s\n" % name))
710
785
encoder = chunked.Encoder(self._stream)
711
map(encoder.write, content.iter_bytes())
786
list(map(encoder.write, content.iter_bytes()))
800
875
def _run(self, result):
801
876
protocol = TestProtocolServer(result)
802
output = subprocess.Popen(self.script, shell=True,
803
stdout=subprocess.PIPE).communicate()[0]
804
protocol.readFrom(StringIO(output.decode('utf-8')))
877
process = subprocess.Popen(self.script, shell=True,
878
stdout=subprocess.PIPE)
879
_make_stream_binary(process.stdout)
880
output = process.communicate()[0]
881
protocol.readFrom(BytesIO(output))
807
884
class IsolatedTestCase(unittest.TestCase):
1135
1212
return os.popen(formatter, "w")
1215
if sys.version_info > (3, 0):
1216
stream = stream.buffer
1220
if sys.version_info > (3, 0):
1221
from io import UnsupportedOperation as _NoFilenoError
1223
_NoFilenoError = AttributeError
1225
def read_test_list(path):
1226
"""Read a list of test ids from a file on disk.
1228
:param path: Path to the file
1229
:return: Sequence of test ids
1231
f = open(path, 'rb')
1233
return [l.rstrip("\n") for l in f.readlines()]
1140
1238
def _make_stream_binary(stream):
1141
1239
"""Ensure that a stream will be binary safe. See _make_binary_on_windows."""
1142
if getattr(stream, 'fileno', None) is not None:
1143
_make_binary_on_windows(stream.fileno())
1241
fileno = stream.fileno()
1242
except _NoFilenoError:
1244
_make_binary_on_windows(fileno)
1145
1246
def _make_binary_on_windows(fileno):
1146
1247
"""Win32 mangles \r\n to \n and that breaks streams. See bug lp:505078."""