1
"""Unit tests for io.py."""
2
from __future__ import print_function
3
from __future__ import unicode_literals
12
from itertools import chain, cycle
13
from test import test_support
16
import io # The module under test
19
class MockRawIO(io.RawIOBase):
21
def __init__(self, read_stack=()):
22
self._read_stack = list(read_stack)
23
self._write_stack = []
25
def read(self, n=None):
27
return self._read_stack.pop(0)
32
self._write_stack.append(b[:])
47
def seek(self, pos, whence):
54
class MockFileIO(io.BytesIO):
56
def __init__(self, data):
57
self.read_history = []
58
io.BytesIO.__init__(self, data)
60
def read(self, n=None):
61
res = io.BytesIO.read(self, n)
62
self.read_history.append(None if res is None else len(res))
66
class MockNonBlockWriterIO(io.RawIOBase):
68
def __init__(self, blocking_script):
69
self._blocking_script = list(blocking_script)
70
self._write_stack = []
73
self._write_stack.append(b[:])
74
n = self._blocking_script.pop(0)
76
raise io.BlockingIOError(0, "test blocking", -n)
84
class IOTest(unittest.TestCase):
87
test_support.unlink(test_support.TESTFN)
89
def write_ops(self, f):
90
self.assertEqual(f.write(b"blah."), 5)
91
self.assertEqual(f.seek(0), 0)
92
self.assertEqual(f.write(b"Hello."), 6)
93
self.assertEqual(f.tell(), 6)
94
self.assertEqual(f.seek(-1, 1), 5)
95
self.assertEqual(f.tell(), 5)
96
self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
97
self.assertEqual(f.seek(0), 0)
98
self.assertEqual(f.write(b"h"), 1)
99
self.assertEqual(f.seek(-1, 2), 13)
100
self.assertEqual(f.tell(), 13)
101
self.assertEqual(f.truncate(12), 12)
102
self.assertEqual(f.tell(), 12)
103
self.assertRaises(TypeError, f.seek, 0.0)
105
def read_ops(self, f, buffered=False):
107
self.assertEqual(data, b"hello")
108
data = bytearray(data)
109
self.assertEqual(f.readinto(data), 5)
110
self.assertEqual(data, b" worl")
111
self.assertEqual(f.readinto(data), 2)
112
self.assertEqual(len(data), 5)
113
self.assertEqual(data[:2], b"d\n")
114
self.assertEqual(f.seek(0), 0)
115
self.assertEqual(f.read(20), b"hello world\n")
116
self.assertEqual(f.read(1), b"")
117
self.assertEqual(f.readinto(bytearray(b"x")), 0)
118
self.assertEqual(f.seek(-6, 2), 6)
119
self.assertEqual(f.read(5), b"world")
120
self.assertEqual(f.read(0), b"")
121
self.assertEqual(f.readinto(bytearray()), 0)
122
self.assertEqual(f.seek(-6, 1), 5)
123
self.assertEqual(f.read(5), b" worl")
124
self.assertEqual(f.tell(), 10)
125
self.assertRaises(TypeError, f.seek, 0.0)
128
self.assertEqual(f.read(), b"hello world\n")
130
self.assertEqual(f.read(), b"world\n")
131
self.assertEqual(f.read(), b"")
135
def large_file_ops(self, f):
138
self.assertEqual(f.seek(self.LARGE), self.LARGE)
139
self.assertEqual(f.tell(), self.LARGE)
140
self.assertEqual(f.write(b"xxx"), 3)
141
self.assertEqual(f.tell(), self.LARGE + 3)
142
self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
143
self.assertEqual(f.truncate(), self.LARGE + 2)
144
self.assertEqual(f.tell(), self.LARGE + 2)
145
self.assertEqual(f.seek(0, 2), self.LARGE + 2)
146
self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
147
self.assertEqual(f.tell(), self.LARGE + 1)
148
self.assertEqual(f.seek(0, 2), self.LARGE + 1)
149
self.assertEqual(f.seek(-1, 2), self.LARGE)
150
self.assertEqual(f.read(2), b"x")
152
def test_raw_file_io(self):
153
f = io.open(test_support.TESTFN, "wb", buffering=0)
154
self.assertEqual(f.readable(), False)
155
self.assertEqual(f.writable(), True)
156
self.assertEqual(f.seekable(), True)
159
f = io.open(test_support.TESTFN, "rb", buffering=0)
160
self.assertEqual(f.readable(), True)
161
self.assertEqual(f.writable(), False)
162
self.assertEqual(f.seekable(), True)
166
def test_buffered_file_io(self):
167
f = io.open(test_support.TESTFN, "wb")
168
self.assertEqual(f.readable(), False)
169
self.assertEqual(f.writable(), True)
170
self.assertEqual(f.seekable(), True)
173
f = io.open(test_support.TESTFN, "rb")
174
self.assertEqual(f.readable(), True)
175
self.assertEqual(f.writable(), False)
176
self.assertEqual(f.seekable(), True)
177
self.read_ops(f, True)
180
def test_readline(self):
181
f = io.open(test_support.TESTFN, "wb")
182
f.write(b"abc\ndef\nxyzzy\nfoo")
184
f = io.open(test_support.TESTFN, "rb")
185
self.assertEqual(f.readline(), b"abc\n")
186
self.assertEqual(f.readline(10), b"def\n")
187
self.assertEqual(f.readline(2), b"xy")
188
self.assertEqual(f.readline(4), b"zzy\n")
189
self.assertEqual(f.readline(), b"foo")
192
def test_raw_bytes_io(self):
196
self.assertEqual(data, b"hello world\n")
198
self.read_ops(f, True)
200
def test_large_file_ops(self):
201
# On Windows and Mac OSX this test comsumes large resources; It takes
202
# a long time to build the >2GB file and takes >2GB of disk space
203
# therefore the resource must be enabled to run this test.
204
if sys.platform[:3] in ('win', 'os2') or sys.platform == 'darwin':
205
if not test_support.is_resource_enabled("largefile"):
206
print("\nTesting large file ops skipped on %s." % sys.platform,
208
print("It requires %d bytes and a long time." % self.LARGE,
210
print("Use 'regrtest.py -u largefile test_io' to run it.",
213
f = io.open(test_support.TESTFN, "w+b", 0)
214
self.large_file_ops(f)
216
f = io.open(test_support.TESTFN, "w+b")
217
self.large_file_ops(f)
220
def test_with_open(self):
221
for bufsize in (0, 1, 100):
223
with open(test_support.TESTFN, "wb", bufsize) as f:
225
self.assertEqual(f.closed, True)
228
with open(test_support.TESTFN, "wb", bufsize) as f:
230
except ZeroDivisionError:
231
self.assertEqual(f.closed, True)
233
self.fail("1/0 didn't raise an exception")
235
def test_destructor(self):
237
class MyFileIO(io.FileIO):
240
io.FileIO.__del__(self)
243
io.FileIO.close(self)
246
io.FileIO.flush(self)
247
f = MyFileIO(test_support.TESTFN, "w")
250
self.assertEqual(record, [1, 2, 3])
252
def test_close_flushes(self):
253
f = io.open(test_support.TESTFN, "wb")
256
f = io.open(test_support.TESTFN, "rb")
257
self.assertEqual(f.read(), b"xxx")
260
def XXXtest_array_writes(self):
261
# XXX memory view not available yet
262
a = array.array('i', range(10))
263
n = len(memoryview(a))
264
f = io.open(test_support.TESTFN, "wb", 0)
265
self.assertEqual(f.write(a), n)
267
f = io.open(test_support.TESTFN, "wb")
268
self.assertEqual(f.write(a), n)
271
def test_closefd(self):
272
self.assertRaises(ValueError, io.open, test_support.TESTFN, 'w',
275
class MemorySeekTestMixin:
278
buf = self.buftype("1234567890")
279
bytesIo = self.ioclass(buf)
282
buf = self.buftype("1234567890")
283
bytesIo = self.ioclass(buf)
285
self.assertEquals(buf[:1], bytesIo.read(1))
286
self.assertEquals(buf[1:5], bytesIo.read(4))
287
self.assertEquals(buf[5:], bytesIo.read(900))
288
self.assertEquals(self.EOF, bytesIo.read())
290
def testReadNoArgs(self):
291
buf = self.buftype("1234567890")
292
bytesIo = self.ioclass(buf)
294
self.assertEquals(buf, bytesIo.read())
295
self.assertEquals(self.EOF, bytesIo.read())
298
buf = self.buftype("1234567890")
299
bytesIo = self.ioclass(buf)
303
self.assertEquals(buf, bytesIo.read())
306
self.assertEquals(buf[3:], bytesIo.read())
307
self.assertRaises(TypeError, bytesIo.seek, 0.0)
310
buf = self.buftype("1234567890")
311
bytesIo = self.ioclass(buf)
313
self.assertEquals(0, bytesIo.tell())
315
self.assertEquals(5, bytesIo.tell())
317
self.assertEquals(10000, bytesIo.tell())
320
class BytesIOTest(MemorySeekTestMixin, unittest.TestCase):
323
return s.encode("utf-8")
328
class StringIOTest(MemorySeekTestMixin, unittest.TestCase):
330
ioclass = io.StringIO
334
class BufferedReaderTest(unittest.TestCase):
337
rawio = MockRawIO((b"abc", b"d", b"efg"))
338
bufio = io.BufferedReader(rawio)
340
self.assertEquals(b"abcdef", bufio.read(6))
342
def testBuffering(self):
347
[ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
348
[ 100, [ 3, 3, 3], [ dlen ] ],
349
[ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
352
for bufsize, buf_read_sizes, raw_read_sizes in tests:
353
rawio = MockFileIO(data)
354
bufio = io.BufferedReader(rawio, buffer_size=bufsize)
356
for nbytes in buf_read_sizes:
357
self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
359
self.assertEquals(rawio.read_history, raw_read_sizes)
361
def testReadNonBlocking(self):
362
# Inject some None's in there to simulate EWOULDBLOCK
363
rawio = MockRawIO((b"abc", b"d", None, b"efg", None, None))
364
bufio = io.BufferedReader(rawio)
366
self.assertEquals(b"abcd", bufio.read(6))
367
self.assertEquals(b"e", bufio.read(1))
368
self.assertEquals(b"fg", bufio.read())
369
self.assert_(None is bufio.read())
370
self.assertEquals(b"", bufio.read())
372
def testReadToEof(self):
373
rawio = MockRawIO((b"abc", b"d", b"efg"))
374
bufio = io.BufferedReader(rawio)
376
self.assertEquals(b"abcdefg", bufio.read(9000))
378
def testReadNoArgs(self):
379
rawio = MockRawIO((b"abc", b"d", b"efg"))
380
bufio = io.BufferedReader(rawio)
382
self.assertEquals(b"abcdefg", bufio.read())
384
def testFileno(self):
385
rawio = MockRawIO((b"abc", b"d", b"efg"))
386
bufio = io.BufferedReader(rawio)
388
self.assertEquals(42, bufio.fileno())
390
def testFilenoNoFileno(self):
391
# XXX will we always have fileno() function? If so, kill
392
# this test. Else, write it.
395
def testThreads(self):
397
# Write out many bytes with exactly the same number of 0's,
398
# 1's... 255's. This will help us check that concurrent reading
399
# doesn't duplicate or forget contents.
403
s = bytes(bytearray(l))
404
with io.open(test_support.TESTFN, "wb") as f:
406
with io.open(test_support.TESTFN, "rb", buffering=0) as raw:
407
bufio = io.BufferedReader(raw, 8)
412
# Intra-buffer read then buffer-flushing read
413
for n in cycle([1, 19]):
417
# list.append() is atomic
419
except Exception as e:
422
threads = [threading.Thread(target=f) for x in range(20)]
425
time.sleep(0.02) # yield
428
self.assertFalse(errors,
429
"the following exceptions were caught: %r" % errors)
430
s = b''.join(results)
432
c = bytes(bytearray([i]))
433
self.assertEqual(s.count(c), N)
435
test_support.unlink(test_support.TESTFN)
439
class BufferedWriterTest(unittest.TestCase):
442
# Write to the buffered IO but don't overflow the buffer.
444
bufio = io.BufferedWriter(writer, 8)
448
self.assertFalse(writer._write_stack)
450
def testWriteOverflow(self):
452
bufio = io.BufferedWriter(writer, 8)
455
bufio.write(b"defghijkl")
457
self.assertEquals(b"abcdefghijkl", writer._write_stack[0])
459
def testWriteNonBlocking(self):
460
raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12))
461
bufio = io.BufferedWriter(raw, 8, 16)
464
bufio.write(b"asdfa")
465
self.assertEquals(b"asdfasdfa", raw._write_stack[0])
467
bufio.write(b"asdfasdfasdf")
468
self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
469
bufio.write(b"asdfasdfasdf")
470
self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
471
self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
473
bufio.write(b"asdfasdfasdf")
475
# XXX I don't like this test. It relies too heavily on how the
476
# algorithm actually works, which we might change. Refactor
479
def testFileno(self):
480
rawio = MockRawIO((b"abc", b"d", b"efg"))
481
bufio = io.BufferedWriter(rawio)
483
self.assertEquals(42, bufio.fileno())
487
bufio = io.BufferedWriter(writer, 8)
492
self.assertEquals(b"abc", writer._write_stack[0])
494
def testThreads(self):
495
# BufferedWriter should not raise exceptions or crash
496
# when called from multiple threads.
498
# We use a real file object because it allows us to
499
# exercise situations where the GIL is released before
500
# writing the buffer to the raw streams. This is in addition
501
# to concurrency issues due to switching threads in the middle
503
with io.open(test_support.TESTFN, "wb", buffering=0) as raw:
504
bufio = io.BufferedWriter(raw, 8)
508
# Write enough bytes to flush the buffer
512
except Exception as e:
515
threads = [threading.Thread(target=f) for x in range(20)]
518
time.sleep(0.02) # yield
521
self.assertFalse(errors,
522
"the following exceptions were caught: %r" % errors)
524
test_support.unlink(test_support.TESTFN)
527
class BufferedRWPairTest(unittest.TestCase):
529
def testRWPair(self):
532
pair = io.BufferedRWPair(r, w)
534
# XXX need implementation
537
class BufferedRandomTest(unittest.TestCase):
539
def testReadAndWrite(self):
540
raw = MockRawIO((b"asdf", b"ghjk"))
541
rw = io.BufferedRandom(raw, 8, 12)
543
self.assertEqual(b"as", rw.read(2))
546
self.assertFalse(raw._write_stack) # Buffer writes
547
self.assertEqual(b"ghjk", rw.read()) # This read forces write flush
548
self.assertEquals(b"dddeee", raw._write_stack[0])
550
def testSeekAndTell(self):
551
raw = io.BytesIO(b"asdfghjkl")
552
rw = io.BufferedRandom(raw)
554
self.assertEquals(b"as", rw.read(2))
555
self.assertEquals(2, rw.tell())
557
self.assertEquals(b"asdf", rw.read(4))
561
self.assertEquals(b"asdfasdfl", rw.read())
562
self.assertEquals(9, rw.tell())
564
self.assertEquals(5, rw.tell())
566
self.assertEquals(7, rw.tell())
567
self.assertEquals(b"fl", rw.read(11))
568
self.assertRaises(TypeError, rw.seek, 0.0)
570
# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
572
# - A single output character can correspond to many bytes of input.
573
# - The number of input bytes to complete the character can be
574
# undetermined until the last input byte is received.
575
# - The number of input bytes can vary depending on previous input.
576
# - A single input byte can correspond to many characters of output.
577
# - The number of output characters can be undetermined until the
578
# last input byte is received.
579
# - The number of output characters can vary depending on previous input.
581
class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
583
For testing seek/tell behavior with a stateful, buffering decoder.
585
Input is a sequence of words. Words may be fixed-length (length set
586
by input) or variable-length (period-terminated). In variable-length
587
mode, extra periods are ignored. Possible words are:
588
- 'i' followed by a number sets the input length, I (maximum 99).
589
When I is set to 0, words are space-terminated.
590
- 'o' followed by a number sets the output length, O (maximum 99).
591
- Any other word is converted into a word followed by a period on
592
the output. The output word consists of the input word truncated
593
or padded out with hyphens to make its length equal to O. If O
594
is 0, the word is output verbatim without truncating or padding.
595
I and O are initially set to 1. When I changes, any buffered input is
596
re-scanned according to the new I. EOF also terminates the last word.
599
def __init__(self, errors='strict'):
600
codecs.IncrementalDecoder.__init__(self, errors)
604
return '<SID %x>' % id(self)
609
self.buffer = bytearray()
612
i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
613
return bytes(self.buffer), i*100 + o
615
def setstate(self, state):
617
self.buffer = bytearray(buffer)
618
i, o = divmod(io, 100)
619
self.i, self.o = i ^ 1, o ^ 1
621
def decode(self, input, final=False):
624
if self.i == 0: # variable-length, terminated with period
627
output += self.process_word()
629
self.buffer.append(b)
630
else: # fixed-length, terminate after self.i bytes
631
self.buffer.append(b)
632
if len(self.buffer) == self.i:
633
output += self.process_word()
634
if final and self.buffer: # EOF terminates the last word
635
output += self.process_word()
638
def process_word(self):
640
if self.buffer[0] == ord('i'):
641
self.i = min(99, int(self.buffer[1:] or 0)) # set input length
642
elif self.buffer[0] == ord('o'):
643
self.o = min(99, int(self.buffer[1:] or 0)) # set output length
645
output = self.buffer.decode('ascii')
646
if len(output) < self.o:
647
output += '-'*self.o # pad out with hyphens
649
output = output[:self.o] # truncate to output length
651
self.buffer = bytearray()
657
def lookupTestDecoder(cls, name):
658
if cls.codecEnabled and name == 'test_decoder':
659
return codecs.CodecInfo(
660
name='test_decoder', encode=None, decode=None,
661
incrementalencoder=None,
662
streamreader=None, streamwriter=None,
663
incrementaldecoder=cls)
665
# Register the previous decoder for testing.
666
# Disabled by default, tests will enable it.
667
codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
670
class StatefulIncrementalDecoderTest(unittest.TestCase):
672
Make sure the StatefulIncrementalDecoder actually works.
676
# I=1, O=1 (fixed-length input == fixed-length output)
677
(b'abcd', False, 'a.b.c.d.'),
678
# I=0, O=0 (variable-length input, variable-length output)
679
(b'oiabcd', True, 'abcd.'),
680
# I=0, O=0 (should ignore extra periods)
681
(b'oi...abcd...', True, 'abcd.'),
682
# I=0, O=6 (variable-length input, fixed-length output)
683
(b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
684
# I=2, O=6 (fixed-length input < fixed-length output)
685
(b'i.i2.o6xyz', True, 'xy----.z-----.'),
686
# I=6, O=3 (fixed-length input > fixed-length output)
687
(b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
688
# I=0, then 3; O=29, then 15 (with longer output)
689
(b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
690
'a----------------------------.' +
691
'b----------------------------.' +
692
'cde--------------------------.' +
702
def testDecoder(self):
703
# Try a few one-shot test cases.
704
for input, eof, output in self.test_cases:
705
d = StatefulIncrementalDecoder()
706
self.assertEquals(d.decode(input, eof), output)
708
# Also test an unfinished decode, followed by forcing EOF.
709
d = StatefulIncrementalDecoder()
710
self.assertEquals(d.decode(b'oiabcd'), '')
711
self.assertEquals(d.decode(b'', 1), 'abcd.')
713
class TextIOWrapperTest(unittest.TestCase):
716
self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
717
self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
720
test_support.unlink(test_support.TESTFN)
722
def testLineBuffering(self):
724
b = io.BufferedWriter(r, 1000)
725
t = io.TextIOWrapper(b, newline="\n", line_buffering=True)
727
self.assertEquals(r.getvalue(), b"") # No flush happened
729
self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
731
self.assertEquals(r.getvalue(), b"XY\nZA\rB")
733
def testEncodingErrorsReading(self):
735
b = io.BytesIO(b"abc\n\xff\n")
736
t = io.TextIOWrapper(b, encoding="ascii")
737
self.assertRaises(UnicodeError, t.read)
738
# (2) explicit strict
739
b = io.BytesIO(b"abc\n\xff\n")
740
t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
741
self.assertRaises(UnicodeError, t.read)
743
b = io.BytesIO(b"abc\n\xff\n")
744
t = io.TextIOWrapper(b, encoding="ascii", errors="ignore")
745
self.assertEquals(t.read(), "abc\n\n")
747
b = io.BytesIO(b"abc\n\xff\n")
748
t = io.TextIOWrapper(b, encoding="ascii", errors="replace")
749
self.assertEquals(t.read(), u"abc\n\ufffd\n")
751
def testEncodingErrorsWriting(self):
754
t = io.TextIOWrapper(b, encoding="ascii")
755
self.assertRaises(UnicodeError, t.write, u"\xff")
756
# (2) explicit strict
758
t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
759
self.assertRaises(UnicodeError, t.write, u"\xff")
762
t = io.TextIOWrapper(b, encoding="ascii", errors="ignore",
764
t.write(u"abc\xffdef\n")
766
self.assertEquals(b.getvalue(), b"abcdef\n")
769
t = io.TextIOWrapper(b, encoding="ascii", errors="replace",
771
t.write(u"abc\xffdef\n")
773
self.assertEquals(b.getvalue(), b"abc?def\n")
775
def testNewlinesInput(self):
776
testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
777
normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
778
for newline, expected in [
779
(None, normalized.decode("ascii").splitlines(True)),
780
("", testdata.decode("ascii").splitlines(True)),
781
("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
782
("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
783
("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
785
buf = io.BytesIO(testdata)
786
txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
787
self.assertEquals(txt.readlines(), expected)
789
self.assertEquals(txt.read(), "".join(expected))
791
def testNewlinesOutput(self):
793
"": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
794
"\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
795
"\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
796
"\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
798
tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
799
for newline, expected in tests:
801
txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
803
txt.write("BB\nCCC\n")
804
txt.write("X\rY\r\nZ")
806
self.assertEquals(buf.closed, False)
807
self.assertEquals(buf.getvalue(), expected)
809
def testNewlines(self):
810
input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
813
[ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
815
[ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
816
[ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
817
[ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
820
encodings = ('utf-8', 'latin-1')
822
# Try a range of buffer sizes to test the case where \r is the last
823
# character in TextIOWrapper._pending_line.
824
for encoding in encodings:
825
# XXX: str.encode() should return bytes
826
data = bytes(''.join(input_lines).encode(encoding))
827
for do_reads in (False, True):
828
for bufsize in range(1, 10):
829
for newline, exp_lines in tests:
830
bufio = io.BufferedReader(io.BytesIO(data), bufsize)
831
textio = io.TextIOWrapper(bufio, newline=newline,
839
self.assertEquals(len(c2), 2)
840
got_lines.append(c2 + textio.readline())
842
got_lines = list(textio)
844
for got_line, exp_line in zip(got_lines, exp_lines):
845
self.assertEquals(got_line, exp_line)
846
self.assertEquals(len(got_lines), len(exp_lines))
848
def testNewlinesInput(self):
849
testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
850
normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
851
for newline, expected in [
852
(None, normalized.decode("ascii").splitlines(True)),
853
("", testdata.decode("ascii").splitlines(True)),
854
("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
855
("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
856
("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
858
buf = io.BytesIO(testdata)
859
txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
860
self.assertEquals(txt.readlines(), expected)
862
self.assertEquals(txt.read(), "".join(expected))
864
def testNewlinesOutput(self):
865
data = u"AAA\nBBB\rCCC\n"
866
data_lf = b"AAA\nBBB\rCCC\n"
867
data_cr = b"AAA\rBBB\rCCC\r"
868
data_crlf = b"AAA\r\nBBB\rCCC\r\n"
869
save_linesep = os.linesep
871
for os.linesep, newline, expected in [
872
("\n", None, data_lf),
873
("\r\n", None, data_crlf),
875
("\r\n", "", data_lf),
876
("\n", "\n", data_lf),
877
("\r\n", "\n", data_lf),
878
("\n", "\r", data_cr),
879
("\r\n", "\r", data_cr),
880
("\n", "\r\n", data_crlf),
881
("\r\n", "\r\n", data_crlf),
884
txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
887
self.assertEquals(buf.closed, True)
888
self.assertRaises(ValueError, buf.getvalue)
890
os.linesep = save_linesep
892
# Systematic tests of the text I/O API
894
def testBasicIO(self):
895
for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
896
for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
897
f = io.open(test_support.TESTFN, "w+", encoding=enc)
898
f._CHUNK_SIZE = chunksize
899
self.assertEquals(f.write(u"abc"), 3)
901
f = io.open(test_support.TESTFN, "r+", encoding=enc)
902
f._CHUNK_SIZE = chunksize
903
self.assertEquals(f.tell(), 0)
904
self.assertEquals(f.read(), u"abc")
906
self.assertEquals(f.seek(0), 0)
907
self.assertEquals(f.read(2), u"ab")
908
self.assertEquals(f.read(1), u"c")
909
self.assertEquals(f.read(1), u"")
910
self.assertEquals(f.read(), u"")
911
self.assertEquals(f.tell(), cookie)
912
self.assertEquals(f.seek(0), 0)
913
self.assertEquals(f.seek(0, 2), cookie)
914
self.assertEquals(f.write(u"def"), 3)
915
self.assertEquals(f.seek(cookie), cookie)
916
self.assertEquals(f.read(), u"def")
917
if enc.startswith("utf"):
918
self.multi_line_test(f, enc)
921
def multi_line_test(self, f, enc):
924
sample = u"s\xff\u0fff\uffff"
926
for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
928
for i in range(size):
929
chars.append(sample[i % len(sample)])
930
line = u"".join(chars) + u"\n"
931
wlines.append((f.tell(), line))
940
rlines.append((pos, line))
941
self.assertEquals(rlines, wlines)
943
def testTelling(self):
944
f = io.open(test_support.TESTFN, "w+", encoding="utf8")
951
self.assertEquals(f.tell(), p0)
952
self.assertEquals(f.readline(), u"\xff\n")
953
self.assertEquals(f.tell(), p1)
954
self.assertEquals(f.readline(), u"\xff\n")
955
self.assertEquals(f.tell(), p2)
958
self.assertEquals(line, u"\xff\n")
959
self.assertRaises(IOError, f.tell)
960
self.assertEquals(f.tell(), p2)
963
def testSeeking(self):
964
chunk_size = io.TextIOWrapper._CHUNK_SIZE
965
prefix_size = chunk_size - 2
966
u_prefix = "a" * prefix_size
967
prefix = bytes(u_prefix.encode("utf-8"))
968
self.assertEquals(len(u_prefix), len(prefix))
969
u_suffix = "\u8888\n"
970
suffix = bytes(u_suffix.encode("utf-8"))
971
line = prefix + suffix
972
f = io.open(test_support.TESTFN, "wb")
975
f = io.open(test_support.TESTFN, "r", encoding="utf-8")
976
s = f.read(prefix_size)
977
self.assertEquals(s, unicode(prefix, "ascii"))
978
self.assertEquals(f.tell(), prefix_size)
979
self.assertEquals(f.readline(), u_suffix)
981
def testSeekingToo(self):
982
# Regression test for a specific bug
983
data = b'\xe0\xbf\xbf\n'
984
f = io.open(test_support.TESTFN, "wb")
987
f = io.open(test_support.TESTFN, "r", encoding="utf-8")
988
f._CHUNK_SIZE # Just test that it exists
993
def testSeekAndTell(self):
994
"""Test seek/tell using the StatefulIncrementalDecoder."""
996
def testSeekAndTellWithData(data, min_pos=0):
997
"""Tell/seek to various points within a data stream and ensure
998
that the decoded data returned by read() is consistent."""
999
f = io.open(test_support.TESTFN, 'wb')
1002
f = io.open(test_support.TESTFN, encoding='test_decoder')
1006
for i in range(min_pos, len(decoded) + 1): # seek positions
1007
for j in [1, 5, len(decoded) - i]: # read lengths
1008
f = io.open(test_support.TESTFN, encoding='test_decoder')
1009
self.assertEquals(f.read(i), decoded[:i])
1011
self.assertEquals(f.read(j), decoded[i:i + j])
1013
self.assertEquals(f.read(), decoded[i:])
1016
# Enable the test decoder.
1017
StatefulIncrementalDecoder.codecEnabled = 1
1021
# Try each test case.
1022
for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1023
testSeekAndTellWithData(input)
1025
# Position each test case so that it crosses a chunk boundary.
1026
CHUNK_SIZE = io.TextIOWrapper._CHUNK_SIZE
1027
for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1028
offset = CHUNK_SIZE - len(input)//2
1029
prefix = b'.'*offset
1030
# Don't bother seeking into the prefix (takes too long).
1032
testSeekAndTellWithData(prefix + input, min_pos)
1034
# Ensure our test decoder won't interfere with subsequent tests.
1036
StatefulIncrementalDecoder.codecEnabled = 0
1038
def testEncodedWrites(self):
1039
data = u"1234567890"
1046
for encoding in tests:
1048
f = io.TextIOWrapper(buf, encoding=encoding)
1049
# Check if the BOM is written only once (see issue1753).
1053
self.assertEquals(f.read(), data * 2)
1054
self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
1056
def timingTest(self):
1059
line = "\0\x0f\xff\u0fff\uffff\U000fffff\U0010ffff"*3 + "\n"
1062
nbytes = len(line.encode(enc))
1063
for chunk_size in (32, 64, 128, 256):
1064
f = io.open(test_support.TESTFN, "w+", encoding=enc)
1065
f._CHUNK_SIZE = chunk_size
1067
for i in range(nlines):
1084
if test_support.verbose:
1085
print("\nTiming test: %d lines of %d characters (%d bytes)" %
1086
(nlines, nchars, nbytes))
1087
print("File chunk size: %6s" % f._CHUNK_SIZE)
1088
print("Writing: %6.3f seconds" % (t1-t0))
1089
print("Reading using iteration: %6.3f seconds" % (t2-t1))
1090
print("Reading using readline(): %6.3f seconds" % (t3-t2))
1091
print("Using readline()+tell(): %6.3f seconds" % (t4-t3))
1093
def testReadOneByOne(self):
1094
txt = io.TextIOWrapper(io.BytesIO(b"AA\r\nBB"))
1101
self.assertEquals(reads, "AA\nBB")
1103
# read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
1104
def testReadByChunk(self):
1105
# make sure "\r\n" straddles 128 char boundary.
1106
txt = io.TextIOWrapper(io.BytesIO(b"A" * 127 + b"\r\nB"))
1113
self.assertEquals(reads, "A"*127+"\nB")
1115
def test_issue1395_1(self):
1116
txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1118
# read one char at a time
1125
self.assertEquals(reads, self.normalized)
1127
def test_issue1395_2(self):
1128
txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1137
self.assertEquals(reads, self.normalized)
1139
def test_issue1395_3(self):
1140
txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1144
reads += txt.read(4)
1145
reads += txt.readline()
1146
reads += txt.readline()
1147
reads += txt.readline()
1148
self.assertEquals(reads, self.normalized)
1150
def test_issue1395_4(self):
1151
txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1156
self.assertEquals(reads, self.normalized)
1158
def test_issue1395_5(self):
1159
txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1166
self.assertEquals(txt.read(4), "BBB\n")
1168
def test_issue2282(self):
1169
buffer = io.BytesIO(self.testdata)
1170
txt = io.TextIOWrapper(buffer, encoding="ascii")
1172
self.assertEqual(buffer.seekable(), txt.seekable())
1174
def test_newline_decoder(self):
1176
decoder = codecs.getincrementaldecoder("utf-8")()
1177
decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
1179
self.assertEquals(decoder.decode(b'\xe8\xa2\x88'), u"\u8888")
1181
self.assertEquals(decoder.decode(b'\xe8'), u"")
1182
self.assertEquals(decoder.decode(b'\xa2'), u"")
1183
self.assertEquals(decoder.decode(b'\x88'), u"\u8888")
1185
self.assertEquals(decoder.decode(b'\xe8'), u"")
1186
self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
1188
decoder.setstate((b'', 0))
1189
self.assertEquals(decoder.decode(b'\n'), u"\n")
1190
self.assertEquals(decoder.decode(b'\r'), u"")
1191
self.assertEquals(decoder.decode(b'', final=True), u"\n")
1192
self.assertEquals(decoder.decode(b'\r', final=True), u"\n")
1194
self.assertEquals(decoder.decode(b'\r'), u"")
1195
self.assertEquals(decoder.decode(b'a'), u"\na")
1197
self.assertEquals(decoder.decode(b'\r\r\n'), u"\n\n")
1198
self.assertEquals(decoder.decode(b'\r'), u"")
1199
self.assertEquals(decoder.decode(b'\r'), u"\n")
1200
self.assertEquals(decoder.decode(b'\na'), u"\na")
1202
self.assertEquals(decoder.decode(b'\xe8\xa2\x88\r\n'), u"\u8888\n")
1203
self.assertEquals(decoder.decode(b'\xe8\xa2\x88'), u"\u8888")
1204
self.assertEquals(decoder.decode(b'\n'), u"\n")
1205
self.assertEquals(decoder.decode(b'\xe8\xa2\x88\r'), u"\u8888")
1206
self.assertEquals(decoder.decode(b'\n'), u"\n")
1208
decoder = codecs.getincrementaldecoder("utf-8")()
1209
decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
1210
self.assertEquals(decoder.newlines, None)
1211
decoder.decode(b"abc\n\r")
1212
self.assertEquals(decoder.newlines, u'\n')
1213
decoder.decode(b"\nabc")
1214
self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1215
decoder.decode(b"abc\r")
1216
self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1217
decoder.decode(b"abc")
1218
self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
1219
decoder.decode(b"abc\r")
1221
self.assertEquals(decoder.decode(b"abc"), "abc")
1222
self.assertEquals(decoder.newlines, None)
1224
# XXX Tests for open()
1226
class MiscIOTest(unittest.TestCase):
1228
def testImport__all__(self):
1229
for name in io.__all__:
1230
obj = getattr(io, name, None)
1231
self.assert_(obj is not None, name)
1234
elif "error" in name.lower():
1235
self.assert_(issubclass(obj, Exception), name)
1237
self.assert_(issubclass(obj, io.IOBase))
1241
test_support.run_unittest(IOTest, BytesIOTest, StringIOTest,
1242
BufferedReaderTest, BufferedWriterTest,
1243
BufferedRWPairTest, BufferedRandomTest,
1244
StatefulIncrementalDecoderTest,
1245
TextIOWrapperTest, MiscIOTest)
1247
if __name__ == "__main__":