1
"""Unit tests for streamio (new standard I/O)."""
6
from pypy.tool.udir import udir
8
from pypy.rlib import streamio
10
from pypy.rpython.test.tool import BaseRtypingTest, LLRtypeMixin, OORtypeMixin
13
class TSource(streamio.Stream):
15
def __init__(self, packets):
18
self.orig_packets = packets[:]
19
self.packets = packets[:]
26
def seek(self, offset, whence=0):
30
for packet in self.orig_packets:
34
self.packets = list(self.orig_packets)
36
while self.pos < offset:
37
data = self.read(offset - self.pos)
39
assert self.pos == offset
44
data = self.packets.pop(0)
48
data, rest = data[:n], data[n:]
49
self.packets.insert(0, rest)
50
self.chunks.append((n, len(data), self.pos))
57
class TReader(TSource):
62
class TWriter(streamio.Stream):
64
def __init__(self, data=''):
69
def write(self, data):
70
self.chunks.append((self.pos, data))
71
if self.pos >= len(self.buf):
72
self.buf += "\0" * (self.pos - len(self.buf)) + data
73
self.pos = len(self.buf)
77
self.buf = (self.buf[:start] + data +
78
self.buf[start + len(data):])
84
def seek(self, offset, whence=0):
90
offset += len(self.buf)
92
raise ValueError, "whence should be 0, 1 or 2"
100
def truncate(self, size=None):
103
if size <= len(self.buf):
104
self.buf = self.buf[:size]
106
self.buf += '\0' * (size - len(self.buf))
111
class TReaderWriter(TWriter):
113
def read(self, n=-1):
117
result = self.buf[start: ]
118
self.pos = len(self.buf)
120
if n > len(self.buf) - start:
121
n = len(self.buf) - start
124
result = self.buf[start: stop]
128
class BaseTestBufferingInputStreamTests(BaseRtypingTest):
130
packets = ["a", "b", "\n", "def", "\nxy\npq\nuv", "wx"]
131
lines = ["ab\n", "def\n", "xy\n", "pq\n", "uvwx"]
136
def makeStream(self, tell=False, seek=False, bufsize=-1):
137
base = TSource(self.packets)
140
raise NotImplementedError
146
return streamio.BufferingInputStream(base, bufsize)
148
def test_readline(self):
149
for file in [self.makeStream(), self.makeStream(bufsize=1)]:
157
result = result and self.lines[i] == r
160
res = self.interpret(f, [])
163
def test_readall(self):
164
file = self.makeStream()
166
return file.readall() == "".join(self.lines)
167
res = self.interpret(f, [])
170
def test_readall_small_bufsize(self):
171
file = self.makeStream(bufsize=1)
173
return file.readall() == "".join(self.lines)
174
res = self.interpret(f, [])
177
def test_readall_after_readline(self):
178
file = self.makeStream()
180
return (file.readline() == self.lines[0] and
181
file.readline() == self.lines[1] and
182
file.readall() == "".join(self.lines[2:]))
183
res = self.interpret(f, [])
186
def test_read_1_after_readline(self):
187
file = self.makeStream()
189
assert file.readline() == "ab\n"
190
assert file.readline() == "def\n"
197
assert file.read(0) == ""
198
return "".join(blocks) == "".join(self.lines)[7:]
199
res = self.interpret(f, [])
202
def test_read_1(self):
203
file = self.makeStream()
211
assert file.read(0) == ""
212
return "".join(blocks) == "".join(self.lines)
213
res = self.interpret(f, [])
216
def test_read_2(self):
217
file = self.makeStream()
225
assert file.read(0) == ""
226
return blocks == ["ab", "\nd", "ef", "\nx", "y\n", "pq",
228
res = self.interpret(f, [])
231
def test_read_4(self):
232
file = self.makeStream()
240
assert file.read(0) == ""
241
return blocks == ["ab\nd", "ef\nx", "y\npq", "\nuvw", "x"]
242
res = self.interpret(f, [])
245
def test_read_4_after_readline(self):
246
file = self.makeStream()
248
res = file.readline()
250
assert file.readline() == "def\n"
251
blocks = [file.read(4)]
257
assert file.read(0) == ""
258
return blocks == ["xy\np", "q\nuv", "wx"]
259
res = self.interpret(f, [])
262
def test_read_4_small_bufsize(self):
263
file = self.makeStream(bufsize=1)
271
return blocks == ["ab\nd", "ef\nx", "y\npq", "\nuvw", "x"]
272
res = self.interpret(f, [])
275
def test_tell_1(self):
276
file = self.makeStream(tell=True)
280
assert file.tell() == pos
281
n = len(file.read(1))
286
res = self.interpret(f, [])
289
def test_tell_1_after_readline(self):
290
file = self.makeStream(tell=True)
293
pos += len(file.readline())
294
assert file.tell() == pos
295
pos += len(file.readline())
296
assert file.tell() == pos
298
assert file.tell() == pos
299
n = len(file.read(1))
304
res = self.interpret(f, [])
307
def test_tell_2(self):
308
file = self.makeStream(tell=True)
312
assert file.tell() == pos
313
n = len(file.read(2))
318
res = self.interpret(f, [])
321
def test_tell_4(self):
322
file = self.makeStream(tell=True)
326
assert file.tell() == pos
327
n = len(file.read(4))
332
res = self.interpret(f, [])
335
def test_tell_readline(self):
336
file = self.makeStream(tell=True)
340
assert file.tell() == pos
341
n = len(file.readline())
346
res = self.interpret(f, [])
350
file = self.makeStream(tell=True, seek=True)
351
end = len(file.readall())
353
cases = [(readto, seekto, whence) for readto in range(0, end+1)
354
for seekto in range(0, end+1)
355
for whence in [0, 1, 2]]
356
random.shuffle(cases)
357
if isinstance(self, (LLRtypeMixin, OORtypeMixin)):
358
cases = cases[:7] # pick some cases at random - too slow!
361
assert end == len(all)
362
for readto, seekto, whence in cases:
364
assert file.tell() == 0
365
head = file.read(readto)
366
assert head == all[:readto]
368
offset = seekto - readto
370
offset = seekto - end
373
file.seek(offset, whence)
375
assert here == seekto
376
rest = file.readall()
377
assert rest == all[seekto:]
379
res = self.interpret(f, [])
382
def test_seek_noseek(self):
383
file = self.makeStream()
386
cases = [(readto, seekto, whence) for readto in range(0, end+1)
387
for seekto in range(readto, end+1)
388
for whence in [1, 2]]
389
random.shuffle(cases)
390
if isinstance(self, (LLRtypeMixin, OORtypeMixin)):
391
cases = cases[:7] # pick some cases at random - too slow!
393
for readto, seekto, whence in cases:
394
base = TSource(self.packets)
395
file = streamio.BufferingInputStream(base)
396
head = file.read(readto)
397
assert head == all[:readto]
398
offset = 42 # for the flow space
400
offset = seekto - readto
402
offset = seekto - end
403
file.seek(offset, whence)
404
rest = file.readall()
405
assert rest == all[seekto:]
407
res = self.interpret(f, [])
410
class TestBufferingInputStreamTests(BaseTestBufferingInputStreamTests):
411
def interpret(self, func, args, **kwds):
414
class TestBufferingInputStreamTestsLLinterp(BaseTestBufferingInputStreamTests,
418
class TestBufferingInputStreamTestsOOinterp(BaseTestBufferingInputStreamTests,
422
class TestBufferedRead:
423
packets = ["a", "b", "\n", "def", "\nxy\npq\nuv", "wx"]
424
lines = ["ab\n", "def\n", "xy\n", "pq\n", "uvwx"]
426
def makeStream(self, tell=False, seek=False, bufsize=-1):
427
base = TSource(self.packets)
430
raise NotImplementedError
435
return streamio.BufferingInputStream(base, bufsize)
437
def test_dont_read_small(self):
439
file = self.makeStream(bufsize=4)
440
while file.read(1): pass
441
for want, got, pos in self.source.chunks:
444
class BaseTestBufferingOutputStream(BaseRtypingTest):
446
def test_write(self):
449
filter = streamio.BufferingOutputStream(base, 4)
451
assert not base.chunks
452
assert filter.tell() == 3
454
filter.write("789ABCDEF")
456
assert filter.tell() == 19
458
assert base.buf == "123456789ABCDEF0123"
459
for chunk in base.chunks[:-1]:
460
assert len(chunk[1]) >= 4
461
self.interpret(f, [])
463
def test_write_seek(self):
466
filter = streamio.BufferingOutputStream(base, 4)
471
assert base.buf == "x"*3 + "y"*2 + "x"*1
472
self.interpret(f, [])
474
def test_write_seek_beyond_end(self):
475
"Linux behaviour. May be different on other platforms."
478
filter = streamio.BufferingOutputStream(base, 4)
482
assert base.buf == "\0"*3 + "y"*2
483
self.interpret(f, [])
485
def test_truncate(self):
486
"Linux behaviour. May be different on other platforms."
489
filter = streamio.BufferingOutputStream(base, 4)
494
assert base.buf == 'xy' + '\0' * 2
495
self.interpret(f, [])
497
def test_truncate2(self):
498
"Linux behaviour. May be different on other platforms."
501
filter = streamio.BufferingOutputStream(base, 4)
502
filter.write('12345678')
506
assert base.buf == '1234' + '\0' * 4 + 'y'
507
self.interpret(f, [])
509
class TestBufferingOutputStream(BaseTestBufferingOutputStream):
510
def interpret(self, func, args, **kwds):
513
class TestBufferingOutputStreamLLinterp(BaseTestBufferingOutputStream,
517
class TestBufferingOutputStreamOOinterp(BaseTestBufferingOutputStream,
522
class BaseTestLineBufferingOutputStream(BaseRtypingTest):
524
def test_write(self):
526
filter = streamio.LineBufferingOutputStream(base)
528
filter.bufsize = 4 # More handy for testing than the default
530
assert base.buf == ""
531
assert filter.tell() == 3
533
assert base.buf == "1234"
534
filter.write("789ABCDEF\n")
535
assert base.buf == "123456789ABCDEF\n"
537
assert base.buf == "123456789ABCDEF\n0123"
538
assert filter.tell() == 20
540
assert base.buf == "123456789ABCDEF\n0123"
541
self.interpret(f, [])
543
def test_write_seek(self):
545
filter = streamio.BufferingOutputStream(base, 4)
551
assert base.buf == "x"*3 + "y"*2 + "x"*1
552
self.interpret(f, [])
554
class TestLineBufferingOutputStream(BaseTestLineBufferingOutputStream):
555
def interpret(self, func, args, **kwds):
558
class TestLineBufferingOutputStreamLLinterp(BaseTestLineBufferingOutputStream,
562
class TestLineBufferingOutputStreamOOinterp(BaseTestLineBufferingOutputStream,
567
class BaseTestCRLFFilter(BaseRtypingTest):
569
def test_filter(self):
570
packets = ["abc\ndef\rghi\r\nxyz\r", "123\r", "\n456"]
571
expected = ["abc\ndef\nghi\nxyz\n", "123\n", "456"]
572
crlf = streamio.CRLFFilter(TSource(packets))
576
block = crlf.read(100)
580
assert blocks == expected
581
self.interpret(f, [])
583
class TestCRLFFilter(BaseTestCRLFFilter):
584
def interpret(self, func, args, **kwds):
587
class TestCRLFFilterLLinterp(BaseTestCRLFFilter, LLRtypeMixin):
590
class TestCRLFFilterOOinterp(BaseTestCRLFFilter, OORtypeMixin):
593
class TestMMapFile(BaseTestBufferingInputStreamTests):
598
def interpret(self, func, args, **kwargs):
601
def teardown_method(self, method):
607
except os.error, msg:
608
print "can't remove %s: %s" % (tfn, msg)
610
def makeStream(self, tell=None, seek=None, bufsize=-1, mode="r"):
615
mmapmode = mmap.ACCESS_READ
616
filemode = os.O_RDONLY
618
mmapmode |= mmap.ACCESS_WRITE
619
filemode |= os.O_WRONLY
620
self.teardown_method(None) # for tests calling makeStream() several time
621
self.tfn = str(udir.join('streamio%03d' % TestMMapFile.Counter))
622
TestMMapFile.Counter += 1
623
f = open(self.tfn, "wb")
624
f.writelines(self.packets)
626
self.fd = os.open(self.tfn, filemode)
627
return streamio.MMapFile(self.fd, mmapmode)
629
def test_write(self):
630
if os.name == "posix":
631
return # write() does't work on Unix :-(
632
file = self.makeStream(mode="w")
633
file.write("BooHoo\n")
635
file.writelines(["a\n", "b\n", "c\n"])
636
assert file.tell() == len("BooHoo\nBarf\na\nb\nc\n")
638
assert file.read() == "BooHoo\nBarf\na\nb\nc\n"
640
assert file.readlines() == (
641
["BooHoo\n", "Barf\n", "a\n", "b\n", "c\n"])
642
assert file.tell() == len("BooHoo\nBarf\na\nb\nc\n")
645
class BaseTestBufferingInputOutputStreamTests(BaseRtypingTest):
647
def test_write(self):
649
base = TReaderWriter()
650
filter = streamio.BufferingInputStream(
651
streamio.BufferingOutputStream(base, 4), 4)
653
filter.write("123456789")
654
for chunk in base.chunks:
655
assert len(chunk[1]) >= 4
656
s = filter.read(sys.maxint)
657
assert base.buf == "123456789"
660
assert not base.chunks
661
s = filter.read(sys.maxint)
662
assert base.buf == "123456789abc"
665
assert not base.chunks
667
assert base.buf == "123456789abc012"
668
assert filter.read(3) == "567"
671
assert base.buf == "1234567x9abc012"
672
self.interpret(f, [])
674
def test_write_seek_beyond_end(self):
675
"Linux behaviour. May be different on other platforms."
676
base = TReaderWriter()
677
filter = streamio.BufferingInputStream(
678
streamio.BufferingOutputStream(base, 4), 4)
683
assert base.buf == "\0"*3 + "y"*2
684
self.interpret(f, [])
686
class TestBufferingInputOutputStreamTests(
687
BaseTestBufferingInputOutputStreamTests):
688
def interpret(self, func, args):
691
class TestBufferingInputOutputStreamTestsLLinterp(
692
BaseTestBufferingInputOutputStreamTests, LLRtypeMixin):
695
class TestBufferingInputOutputStreamTestsOOinterp(
696
BaseTestBufferingInputOutputStreamTests, OORtypeMixin):
700
class BaseTestTextInputFilter(BaseRtypingTest):
709
"abc\ndef\rghi\r\nxyz",
718
("abc\ndef\nghi\nxyz", 30),
719
("\nuvw\npqr\n", 40),
725
expected_with_tell = [
730
("abc\ndef\nghi\nxyz", 30),
731
("\nuvw\npqr\n", 40),
737
expected_newlines = [
741
(["abcd\r"],[0]), # wrong, but requires precognition to fix
742
(["abcd\r", "\nefgh"], [0, 4]),
743
(["abcd", "\nefg\r", "hij", "k\r\n"], [0, 2, 3, 7]),
744
(["abcd", "\refg\r", "\nhij", "k\n"], [0, 1, 5, 7])
748
base = TReader(self.packets)
749
filter = streamio.TextInputFilter(base)
751
for data, pos in self.expected:
752
assert filter.read(100) == data
753
self.interpret(f, [])
755
def test_read_tell(self):
756
base = TReader(self.packets)
757
filter = streamio.TextInputFilter(base)
759
for data, pos in self.expected_with_tell:
760
assert filter.read(100) == data
761
assert filter.tell() == pos
762
assert filter.tell() == pos # Repeat the tell() !
763
self.interpret(f, [])
766
base = TReader(self.packets)
767
filter = streamio.TextInputFilter(base)
772
pairs.append((sofar, filter.tell()))
779
for i in range(len(pairs)):
780
sofar, pos = pairs[i]
782
assert filter.tell() == pos
783
assert filter.tell() == pos
786
data = filter.read(100)
788
assert filter.read(100) == ""
791
assert "".join(bufs) == all
792
self.interpret(f, [])
794
def test_newlines_attribute(self):
795
for packets, expected in self.expected_newlines:
796
base = TReader(packets)
797
filter = streamio.TextInputFilter(base)
801
assert filter.getnewlines() == e
802
self.interpret(f, [])
805
class TestTextInputFilter(BaseTestTextInputFilter):
806
def interpret(self, func, args):
809
class TestTextInputFilterLLinterp(BaseTestTextInputFilter, LLRtypeMixin):
812
class TestTextInputFilterOOinterp(BaseTestTextInputFilter, OORtypeMixin):
816
class BaseTestTextOutputFilter(BaseRtypingTest):
818
def test_write_nl(self):
821
filter = streamio.TextOutputFilter(base, linesep="\n")
823
filter.write("def\npqr\nuvw")
824
filter.write("\n123\n")
825
assert base.buf == "abcdef\npqr\nuvw\n123\n"
826
self.interpret(f, [])
828
def test_write_cr(self):
831
filter = streamio.TextOutputFilter(base, linesep="\r")
833
filter.write("def\npqr\nuvw")
834
filter.write("\n123\n")
835
assert base.buf == "abcdef\rpqr\ruvw\r123\r"
836
self.interpret(f, [])
838
def test_write_crnl(self):
841
filter = streamio.TextOutputFilter(base, linesep="\r\n")
843
filter.write("def\npqr\nuvw")
844
filter.write("\n123\n")
845
assert base.buf == "abcdef\r\npqr\r\nuvw\r\n123\r\n"
846
self.interpret(f, [])
848
def test_write_tell_nl(self):
851
filter = streamio.TextOutputFilter(base, linesep="\n")
853
assert filter.tell() == 3
854
filter.write("\nabc\n")
855
assert filter.tell() == 8
856
self.interpret(f, [])
858
def test_write_tell_cr(self):
861
filter = streamio.TextOutputFilter(base, linesep="\r")
863
assert filter.tell() == 3
864
filter.write("\nabc\n")
865
assert filter.tell() == 8
866
self.interpret(f, [])
868
def test_write_tell_crnl(self):
871
filter = streamio.TextOutputFilter(base, linesep="\r\n")
873
assert filter.tell() == 3
874
filter.write("\nabc\n")
875
assert filter.tell() == 10
876
self.interpret(f, [])
878
def test_write_seek(self):
881
filter = streamio.TextOutputFilter(base, linesep="\n")
882
filter.write("x"*100)
885
assert base.buf == "x"*50 + "y"*10 + "x"*40
886
self.interpret(f, [])
888
class TestTextOutputFilter(BaseTestTextOutputFilter):
889
def interpret(self, func, args):
892
class TestTextOutputFilterLLinterp(BaseTestTextOutputFilter, LLRtypeMixin):
895
class TestTextOutputFilterOOinterp(BaseTestTextOutputFilter, OORtypeMixin):
899
class TestDecodingInputFilter:
902
chars = u"abc\xff\u1234\u4321\x80xyz"
903
data = chars.encode("utf8")
904
base = TReader([data])
905
filter = streamio.DecodingInputFilter(base)
907
for n in range(1, 11):
910
assert type(c) == unicode
914
assert u"".join(bufs) == chars
916
class TestEncodingOutputFilterTests:
918
def test_write(self):
919
chars = u"abc\xff\u1234\u4321\x80xyz"
920
data = chars.encode("utf8")
921
for n in range(1, 11):
923
filter = streamio.EncodingOutputFilter(base)
931
assert base.buf == data
939
def timeit(fn=FN, opener=streamio.MMapFile):
943
for line in iter(f.readline, ""):
947
print "%d lines (%d bytes) in %.3f seconds for %s" % (
948
lines, bytes, t1-t0, opener.__name__)
951
def diskopen(fn, mode):
955
filemode = os.O_RDONLY
957
filemode |= os.O_WRONLY
959
fd = os.open(fn, filemode)
960
base = streamio.DiskFile(fd)
961
return streamio.BufferingInputStream(base)
962
def mmapopen(fn, mode):
967
mmapmode = mmap.ACCESS_READ
968
filemode = os.O_RDONLY
970
mmapmode |= mmap.ACCESS_WRITE
971
filemode |= os.O_WRONLY
972
fd = os.open(fn, filemode)
973
return streamio.MMapFile(fd, mmapmode)
974
timeit(opener=diskopen)
975
timeit(opener=mmapopen)