~ubuntu-branches/ubuntu/karmic/pypy/karmic

« back to all changes in this revision

Viewing changes to pypy/rlib/test/test_streamio.py

  • Committer: Bazaar Package Importer
  • Author(s): Alexandre Fayolle
  • Date: 2007-04-13 09:33:09 UTC
  • Revision ID: james.westby@ubuntu.com-20070413093309-yoojh4jcoocu2krz
Tags: upstream-1.0.0
ImportĀ upstreamĀ versionĀ 1.0.0

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
"""Unit tests for streamio (new standard I/O)."""
 
2
 
 
3
import os
 
4
import time
 
5
import random
 
6
from pypy.tool.udir import udir
 
7
 
 
8
from pypy.rlib import streamio
 
9
 
 
10
from pypy.rpython.test.tool import BaseRtypingTest, LLRtypeMixin, OORtypeMixin
 
11
 
 
12
 
 
13
class TSource(streamio.Stream):
 
14
 
 
15
    def __init__(self, packets):
 
16
        for x in packets:
 
17
            assert x
 
18
        self.orig_packets = packets[:]
 
19
        self.packets = packets[:]
 
20
        self.pos = 0
 
21
        self.chunks = []
 
22
 
 
23
    def tell(self):
 
24
        return self.pos
 
25
 
 
26
    def seek(self, offset, whence=0):
 
27
        if whence == 1:
 
28
            offset += self.pos
 
29
        elif whence == 2:
 
30
            for packet in self.orig_packets:
 
31
                offset += len(packet)
 
32
        else:
 
33
            assert whence == 0
 
34
        self.packets = list(self.orig_packets)
 
35
        self.pos = 0
 
36
        while self.pos < offset:
 
37
            data = self.read(offset - self.pos)
 
38
            assert data
 
39
        assert self.pos == offset
 
40
 
 
41
    def read(self, n):
 
42
        assert n >= 0
 
43
        try:
 
44
            data = self.packets.pop(0)
 
45
        except IndexError:
 
46
            return ""
 
47
        if len(data) > n:
 
48
            data, rest = data[:n], data[n:]
 
49
            self.packets.insert(0, rest)
 
50
        self.chunks.append((n, len(data), self.pos))
 
51
        self.pos += len(data)
 
52
        return data
 
53
 
 
54
    def close(self):
 
55
        pass
 
56
 
 
57
class TReader(TSource):
 
58
 
 
59
    def flush(self):
 
60
        pass
 
61
 
 
62
class TWriter(streamio.Stream):
 
63
 
 
64
    def __init__(self, data=''):
 
65
        self.buf = data
 
66
        self.chunks = []
 
67
        self.pos = 0
 
68
 
 
69
    def write(self, data):
 
70
        self.chunks.append((self.pos, data))
 
71
        if self.pos >= len(self.buf):
 
72
            self.buf += "\0" * (self.pos - len(self.buf)) + data
 
73
            self.pos = len(self.buf)
 
74
        else:
 
75
            start = self.pos
 
76
            assert start >= 0
 
77
            self.buf = (self.buf[:start] + data +
 
78
                        self.buf[start + len(data):])
 
79
            self.pos += len(data)
 
80
 
 
81
    def tell(self):
 
82
        return self.pos
 
83
 
 
84
    def seek(self, offset, whence=0):
 
85
        if whence == 0:
 
86
            pass
 
87
        elif whence == 1:
 
88
            offset += self.pos
 
89
        elif whence == 2:
 
90
            offset += len(self.buf)
 
91
        else:
 
92
            raise ValueError, "whence should be 0, 1 or 2"
 
93
        if offset < 0:
 
94
            offset = 0
 
95
        self.pos = offset
 
96
 
 
97
    def close(self):
 
98
        pass
 
99
 
 
100
    def truncate(self, size=None):
 
101
        if size is None:
 
102
            size = self.pos
 
103
        if size <= len(self.buf):
 
104
            self.buf = self.buf[:size]
 
105
        else:
 
106
            self.buf += '\0' * (size - len(self.buf))
 
107
 
 
108
    def flush(self):
 
109
        pass
 
110
            
 
111
class TReaderWriter(TWriter):
 
112
 
 
113
    def read(self, n=-1):
 
114
        start = self.pos
 
115
        assert start >= 0
 
116
        if n < 1:
 
117
            result = self.buf[start: ]
 
118
            self.pos = len(self.buf)
 
119
        else:
 
120
            if n > len(self.buf) - start:
 
121
                n = len(self.buf) - start
 
122
            stop = start + n
 
123
            assert stop >= 0
 
124
            result = self.buf[start: stop]
 
125
            self.pos += n
 
126
        return result
 
127
    
 
128
class BaseTestBufferingInputStreamTests(BaseRtypingTest):
 
129
 
 
130
    packets = ["a", "b", "\n", "def", "\nxy\npq\nuv", "wx"]
 
131
    lines = ["ab\n", "def\n", "xy\n", "pq\n", "uvwx"]
 
132
 
 
133
    def _freeze_(self):
 
134
        return True
 
135
 
 
136
    def makeStream(self, tell=False, seek=False, bufsize=-1):
 
137
        base = TSource(self.packets)
 
138
        self.source = base
 
139
        def f(*args):
 
140
            raise NotImplementedError
 
141
        if not tell:
 
142
            base.tell = f
 
143
        if not seek:
 
144
            base.seek = f
 
145
 
 
146
        return streamio.BufferingInputStream(base, bufsize)
 
147
 
 
148
    def test_readline(self):
 
149
        for file in [self.makeStream(), self.makeStream(bufsize=1)]:
 
150
            def f():
 
151
                i = 0
 
152
                result = True
 
153
                while 1:
 
154
                    r = file.readline()
 
155
                    if r == "":
 
156
                        break
 
157
                    result = result and self.lines[i] == r
 
158
                    i += 1
 
159
                return result
 
160
            res = self.interpret(f, [])
 
161
            assert res
 
162
 
 
163
    def test_readall(self):
 
164
        file = self.makeStream()
 
165
        def f():
 
166
            return file.readall() == "".join(self.lines)
 
167
        res = self.interpret(f, [])
 
168
        assert res
 
169
 
 
170
    def test_readall_small_bufsize(self):
 
171
        file = self.makeStream(bufsize=1)
 
172
        def f():
 
173
            return file.readall() == "".join(self.lines)
 
174
        res = self.interpret(f, [])
 
175
        assert res
 
176
 
 
177
    def test_readall_after_readline(self):
 
178
        file = self.makeStream()
 
179
        def f():
 
180
            return (file.readline() == self.lines[0] and
 
181
                    file.readline() == self.lines[1] and
 
182
                    file.readall() == "".join(self.lines[2:]))
 
183
        res = self.interpret(f, [])
 
184
        assert res
 
185
 
 
186
    def test_read_1_after_readline(self):
 
187
        file = self.makeStream()
 
188
        def f():
 
189
            assert file.readline() == "ab\n"
 
190
            assert file.readline() == "def\n"
 
191
            blocks = []
 
192
            while 1:
 
193
                block = file.read(1)
 
194
                if not block:
 
195
                    break
 
196
                blocks.append(block)
 
197
                assert file.read(0) == ""
 
198
            return "".join(blocks) == "".join(self.lines)[7:]
 
199
        res = self.interpret(f, [])
 
200
        assert res
 
201
 
 
202
    def test_read_1(self):
 
203
        file = self.makeStream()
 
204
        def f():
 
205
            blocks = []
 
206
            while 1:
 
207
                block = file.read(1)
 
208
                if not block:
 
209
                    break
 
210
                blocks.append(block)
 
211
                assert file.read(0) == ""
 
212
            return "".join(blocks) == "".join(self.lines)
 
213
        res = self.interpret(f, [])
 
214
        assert res
 
215
 
 
216
    def test_read_2(self):
 
217
        file = self.makeStream()
 
218
        def f():
 
219
            blocks = []
 
220
            while 1:
 
221
                block = file.read(2)
 
222
                if not block:
 
223
                    break
 
224
                blocks.append(block)
 
225
                assert file.read(0) == ""
 
226
            return blocks == ["ab", "\nd", "ef", "\nx", "y\n", "pq",
 
227
                              "\nu", "vw", "x"]
 
228
        res = self.interpret(f, [])
 
229
        assert res
 
230
 
 
231
    def test_read_4(self):
 
232
        file = self.makeStream()
 
233
        def f():
 
234
            blocks = []
 
235
            while 1:
 
236
                block = file.read(4)
 
237
                if not block:
 
238
                    break
 
239
                blocks.append(block)
 
240
                assert file.read(0) == ""
 
241
            return blocks == ["ab\nd", "ef\nx", "y\npq", "\nuvw", "x"]
 
242
        res = self.interpret(f, [])
 
243
        assert res
 
244
        
 
245
    def test_read_4_after_readline(self):
 
246
        file = self.makeStream()
 
247
        def f():
 
248
            res = file.readline()
 
249
            assert res == "ab\n"
 
250
            assert file.readline() == "def\n"
 
251
            blocks = [file.read(4)]
 
252
            while 1:
 
253
                block = file.read(4)
 
254
                if not block:
 
255
                    break
 
256
                blocks.append(block)
 
257
                assert file.read(0) == ""
 
258
            return blocks == ["xy\np", "q\nuv", "wx"]
 
259
        res = self.interpret(f, [])
 
260
        assert res
 
261
 
 
262
    def test_read_4_small_bufsize(self):
 
263
        file = self.makeStream(bufsize=1)
 
264
        def f():
 
265
            blocks = []
 
266
            while 1:
 
267
                block = file.read(4)
 
268
                if not block:
 
269
                    break
 
270
                blocks.append(block)
 
271
            return blocks == ["ab\nd", "ef\nx", "y\npq", "\nuvw", "x"]
 
272
        res = self.interpret(f, [])
 
273
        assert res
 
274
 
 
275
    def test_tell_1(self):
 
276
        file = self.makeStream(tell=True)
 
277
        def f():
 
278
            pos = 0
 
279
            while 1:
 
280
                assert file.tell() == pos
 
281
                n = len(file.read(1))
 
282
                if not n:
 
283
                    break
 
284
                pos += n
 
285
            return True
 
286
        res = self.interpret(f, [])
 
287
        assert res
 
288
 
 
289
    def test_tell_1_after_readline(self):
 
290
        file = self.makeStream(tell=True)
 
291
        def f():
 
292
            pos = 0
 
293
            pos += len(file.readline())
 
294
            assert file.tell() == pos
 
295
            pos += len(file.readline())
 
296
            assert file.tell() == pos
 
297
            while 1:
 
298
                assert file.tell() == pos
 
299
                n = len(file.read(1))
 
300
                if not n:
 
301
                    break
 
302
                pos += n
 
303
            return True
 
304
        res = self.interpret(f, [])
 
305
        assert res
 
306
 
 
307
    def test_tell_2(self):
 
308
        file = self.makeStream(tell=True)
 
309
        def f():
 
310
            pos = 0
 
311
            while 1:
 
312
                assert file.tell() == pos
 
313
                n = len(file.read(2))
 
314
                if not n:
 
315
                    break
 
316
                pos += n
 
317
            return True
 
318
        res = self.interpret(f, [])
 
319
        assert res
 
320
 
 
321
    def test_tell_4(self):
 
322
        file = self.makeStream(tell=True)
 
323
        def f():
 
324
            pos = 0
 
325
            while 1:
 
326
                assert file.tell() == pos
 
327
                n = len(file.read(4))
 
328
                if not n:
 
329
                    break
 
330
                pos += n
 
331
            return True
 
332
        res = self.interpret(f, [])
 
333
        assert res
 
334
 
 
335
    def test_tell_readline(self):
 
336
        file = self.makeStream(tell=True)
 
337
        def f():
 
338
            pos = 0
 
339
            while 1:
 
340
                assert file.tell() == pos
 
341
                n = len(file.readline())
 
342
                if not n:
 
343
                    break
 
344
                pos += n
 
345
            return True
 
346
        res = self.interpret(f, [])
 
347
        assert res
 
348
 
 
349
    def test_seek(self):
 
350
        file = self.makeStream(tell=True, seek=True)
 
351
        end = len(file.readall())
 
352
        file.seek(0, 0)
 
353
        cases = [(readto, seekto, whence) for readto in range(0, end+1)
 
354
                                          for seekto in range(0, end+1)
 
355
                                          for whence in [0, 1, 2]]
 
356
        random.shuffle(cases)
 
357
        if isinstance(self, (LLRtypeMixin, OORtypeMixin)):
 
358
            cases = cases[:7]      # pick some cases at random - too slow!
 
359
        def f():
 
360
            all = file.readall()
 
361
            assert end == len(all)
 
362
            for readto, seekto, whence in cases:
 
363
                file.seek(0, 0)
 
364
                assert file.tell() == 0
 
365
                head = file.read(readto)
 
366
                assert head == all[:readto]
 
367
                if whence == 1:
 
368
                    offset = seekto - readto
 
369
                elif whence == 2:
 
370
                    offset = seekto - end
 
371
                else:
 
372
                    offset = seekto
 
373
                file.seek(offset, whence)
 
374
                here = file.tell()
 
375
                assert here == seekto
 
376
                rest = file.readall()
 
377
                assert rest == all[seekto:]
 
378
            return True
 
379
        res = self.interpret(f, [])
 
380
        assert res
 
381
 
 
382
    def test_seek_noseek(self):
 
383
        file = self.makeStream()
 
384
        all = file.readall()
 
385
        end = len(all)
 
386
        cases = [(readto, seekto, whence) for readto in range(0, end+1)
 
387
                                          for seekto in range(readto, end+1)
 
388
                                          for whence in [1, 2]]
 
389
        random.shuffle(cases)
 
390
        if isinstance(self, (LLRtypeMixin, OORtypeMixin)):
 
391
            cases = cases[:7]      # pick some cases at random - too slow!
 
392
        def f():
 
393
            for readto, seekto, whence in cases:
 
394
                base = TSource(self.packets)
 
395
                file = streamio.BufferingInputStream(base)
 
396
                head = file.read(readto)
 
397
                assert head == all[:readto]
 
398
                offset = 42 # for the flow space
 
399
                if whence == 1:
 
400
                    offset = seekto - readto
 
401
                elif whence == 2:
 
402
                    offset = seekto - end
 
403
                file.seek(offset, whence)
 
404
                rest = file.readall()
 
405
                assert rest == all[seekto:]
 
406
            return True
 
407
        res = self.interpret(f, [])
 
408
        assert res
 
409
 
 
410
class TestBufferingInputStreamTests(BaseTestBufferingInputStreamTests):
 
411
    def interpret(self, func, args, **kwds):
 
412
        return func(*args)
 
413
 
 
414
class TestBufferingInputStreamTestsLLinterp(BaseTestBufferingInputStreamTests,
 
415
                                            LLRtypeMixin):
 
416
    pass
 
417
 
 
418
class TestBufferingInputStreamTestsOOinterp(BaseTestBufferingInputStreamTests,
 
419
                                            OORtypeMixin):
 
420
    pass
 
421
 
 
422
class TestBufferedRead:
 
423
    packets = ["a", "b", "\n", "def", "\nxy\npq\nuv", "wx"]
 
424
    lines = ["ab\n", "def\n", "xy\n", "pq\n", "uvwx"]
 
425
 
 
426
    def makeStream(self, tell=False, seek=False, bufsize=-1):
 
427
        base = TSource(self.packets)
 
428
        self.source = base
 
429
        def f(*args):
 
430
            raise NotImplementedError
 
431
        if not tell:
 
432
            base.tell = f
 
433
        if not seek:
 
434
            base.seek = f
 
435
        return streamio.BufferingInputStream(base, bufsize)
 
436
 
 
437
    def test_dont_read_small(self):
 
438
        import sys
 
439
        file = self.makeStream(bufsize=4)
 
440
        while file.read(1): pass
 
441
        for want, got, pos in self.source.chunks:
 
442
            assert want >= 4
 
443
 
 
444
class BaseTestBufferingOutputStream(BaseRtypingTest):
 
445
 
 
446
    def test_write(self):
 
447
        def f():
 
448
            base = TWriter()
 
449
            filter = streamio.BufferingOutputStream(base, 4)
 
450
            filter.write("123")
 
451
            assert not base.chunks
 
452
            assert filter.tell() == 3
 
453
            filter.write("456")
 
454
            filter.write("789ABCDEF")
 
455
            filter.write("0123")
 
456
            assert filter.tell() == 19
 
457
            filter.close()
 
458
            assert base.buf == "123456789ABCDEF0123"
 
459
            for chunk in base.chunks[:-1]:
 
460
                assert len(chunk[1]) >= 4
 
461
        self.interpret(f, [])
 
462
 
 
463
    def test_write_seek(self):
 
464
        def f():
 
465
            base = TWriter()
 
466
            filter = streamio.BufferingOutputStream(base, 4)
 
467
            filter.write("x"*6)
 
468
            filter.seek(3, 0)
 
469
            filter.write("y"*2)
 
470
            filter.close()
 
471
            assert base.buf == "x"*3 + "y"*2 + "x"*1
 
472
        self.interpret(f, [])
 
473
 
 
474
    def test_write_seek_beyond_end(self):
 
475
        "Linux behaviour. May be different on other platforms."
 
476
        def f():
 
477
            base = TWriter()
 
478
            filter = streamio.BufferingOutputStream(base, 4)
 
479
            filter.seek(3, 0)
 
480
            filter.write("y"*2)
 
481
            filter.close()
 
482
            assert base.buf == "\0"*3 + "y"*2
 
483
        self.interpret(f, [])
 
484
 
 
485
    def test_truncate(self):
 
486
        "Linux behaviour. May be different on other platforms."
 
487
        def f():
 
488
            base = TWriter()
 
489
            filter = streamio.BufferingOutputStream(base, 4)
 
490
            filter.write('x')
 
491
            filter.truncate(4)
 
492
            filter.write('y')
 
493
            filter.close()
 
494
            assert base.buf == 'xy' + '\0' * 2
 
495
        self.interpret(f, [])
 
496
 
 
497
    def test_truncate2(self):
 
498
        "Linux behaviour. May be different on other platforms."
 
499
        def f():
 
500
            base = TWriter()
 
501
            filter = streamio.BufferingOutputStream(base, 4)
 
502
            filter.write('12345678')
 
503
            filter.truncate(4)
 
504
            filter.write('y')
 
505
            filter.close()
 
506
            assert base.buf == '1234' + '\0' * 4 + 'y'
 
507
        self.interpret(f, [])
 
508
 
 
509
class TestBufferingOutputStream(BaseTestBufferingOutputStream):
 
510
    def interpret(self, func, args, **kwds):
 
511
        return func(*args)
 
512
 
 
513
class TestBufferingOutputStreamLLinterp(BaseTestBufferingOutputStream,
 
514
                                        LLRtypeMixin):
 
515
    pass
 
516
 
 
517
class TestBufferingOutputStreamOOinterp(BaseTestBufferingOutputStream,
 
518
                                        OORtypeMixin):
 
519
    pass
 
520
 
 
521
 
 
522
class BaseTestLineBufferingOutputStream(BaseRtypingTest):
 
523
 
 
524
    def test_write(self):
 
525
        base = TWriter()
 
526
        filter = streamio.LineBufferingOutputStream(base)
 
527
        def f():
 
528
            filter.bufsize = 4 # More handy for testing than the default
 
529
            filter.write("123")
 
530
            assert base.buf == ""
 
531
            assert filter.tell() == 3
 
532
            filter.write("456")
 
533
            assert base.buf == "1234"
 
534
            filter.write("789ABCDEF\n")
 
535
            assert base.buf == "123456789ABCDEF\n"
 
536
            filter.write("0123")
 
537
            assert base.buf == "123456789ABCDEF\n0123"
 
538
            assert filter.tell() == 20
 
539
            filter.close()
 
540
            assert base.buf == "123456789ABCDEF\n0123"
 
541
        self.interpret(f, [])
 
542
 
 
543
    def test_write_seek(self):
 
544
        base = TWriter()
 
545
        filter = streamio.BufferingOutputStream(base, 4)
 
546
        def f():
 
547
            filter.write("x"*6)
 
548
            filter.seek(3, 0)
 
549
            filter.write("y"*2)
 
550
            filter.close()
 
551
            assert base.buf == "x"*3 + "y"*2 + "x"*1
 
552
        self.interpret(f, [])
 
553
 
 
554
class TestLineBufferingOutputStream(BaseTestLineBufferingOutputStream):
 
555
    def interpret(self, func, args, **kwds):
 
556
        return func(*args)
 
557
 
 
558
class TestLineBufferingOutputStreamLLinterp(BaseTestLineBufferingOutputStream,
 
559
                                            LLRtypeMixin):
 
560
    pass
 
561
 
 
562
class TestLineBufferingOutputStreamOOinterp(BaseTestLineBufferingOutputStream,
 
563
                                            OORtypeMixin):
 
564
    pass
 
565
 
 
566
 
 
567
class BaseTestCRLFFilter(BaseRtypingTest):
 
568
 
 
569
    def test_filter(self):
 
570
        packets = ["abc\ndef\rghi\r\nxyz\r", "123\r", "\n456"]
 
571
        expected = ["abc\ndef\nghi\nxyz\n", "123\n", "456"]
 
572
        crlf = streamio.CRLFFilter(TSource(packets))
 
573
        def f():
 
574
            blocks = []
 
575
            while 1:
 
576
                block = crlf.read(100)
 
577
                if not block:
 
578
                    break
 
579
                blocks.append(block)
 
580
            assert blocks == expected
 
581
        self.interpret(f, [])
 
582
 
 
583
class TestCRLFFilter(BaseTestCRLFFilter):
 
584
    def interpret(self, func, args, **kwds):
 
585
        return func(*args)
 
586
 
 
587
class TestCRLFFilterLLinterp(BaseTestCRLFFilter, LLRtypeMixin):
 
588
    pass
 
589
 
 
590
class TestCRLFFilterOOinterp(BaseTestCRLFFilter, OORtypeMixin):
 
591
    pass
 
592
 
 
593
class TestMMapFile(BaseTestBufferingInputStreamTests):
 
594
    tfn = None
 
595
    fd = None
 
596
    Counter = 0
 
597
 
 
598
    def interpret(self, func, args, **kwargs):
 
599
        return func(*args)
 
600
 
 
601
    def teardown_method(self, method):
 
602
        tfn = self.tfn
 
603
        if tfn:
 
604
            self.tfn = None
 
605
            try:
 
606
                os.remove(tfn)
 
607
            except os.error, msg:
 
608
                print "can't remove %s: %s" % (tfn, msg)
 
609
 
 
610
    def makeStream(self, tell=None, seek=None, bufsize=-1, mode="r"):
 
611
        mmapmode = 0
 
612
        filemode = 0
 
613
        import mmap
 
614
        if "r" in mode:
 
615
            mmapmode = mmap.ACCESS_READ
 
616
            filemode = os.O_RDONLY
 
617
        if "w" in mode:
 
618
            mmapmode |= mmap.ACCESS_WRITE
 
619
            filemode |= os.O_WRONLY
 
620
        self.teardown_method(None) # for tests calling makeStream() several time
 
621
        self.tfn = str(udir.join('streamio%03d' % TestMMapFile.Counter))
 
622
        TestMMapFile.Counter += 1
 
623
        f = open(self.tfn, "wb")
 
624
        f.writelines(self.packets)
 
625
        f.close()
 
626
        self.fd = os.open(self.tfn, filemode)
 
627
        return streamio.MMapFile(self.fd, mmapmode)
 
628
 
 
629
    def test_write(self):
 
630
        if os.name == "posix":
 
631
            return # write() does't work on Unix :-(
 
632
        file = self.makeStream(mode="w")
 
633
        file.write("BooHoo\n")
 
634
        file.write("Barf\n")
 
635
        file.writelines(["a\n", "b\n", "c\n"])
 
636
        assert file.tell() == len("BooHoo\nBarf\na\nb\nc\n")
 
637
        file.seek(0, 0)
 
638
        assert file.read() == "BooHoo\nBarf\na\nb\nc\n"
 
639
        file.seek(0, 0)
 
640
        assert file.readlines() == (
 
641
                         ["BooHoo\n", "Barf\n", "a\n", "b\n", "c\n"])
 
642
        assert file.tell() == len("BooHoo\nBarf\na\nb\nc\n")
 
643
 
 
644
 
 
645
class BaseTestBufferingInputOutputStreamTests(BaseRtypingTest):
 
646
 
 
647
    def test_write(self):
 
648
        import sys
 
649
        base = TReaderWriter()
 
650
        filter = streamio.BufferingInputStream(
 
651
                streamio.BufferingOutputStream(base, 4), 4)
 
652
        def f():
 
653
            filter.write("123456789")
 
654
            for chunk in base.chunks:
 
655
                assert len(chunk[1]) >= 4
 
656
            s = filter.read(sys.maxint)
 
657
            assert base.buf == "123456789"
 
658
            base.chunks = []
 
659
            filter.write("abc")
 
660
            assert not base.chunks
 
661
            s = filter.read(sys.maxint)
 
662
            assert base.buf == "123456789abc"
 
663
            base.chunks = []
 
664
            filter.write("012")
 
665
            assert not base.chunks
 
666
            filter.seek(4, 0)
 
667
            assert base.buf == "123456789abc012"
 
668
            assert filter.read(3) == "567"
 
669
            filter.write('x')
 
670
            filter.flush()
 
671
            assert base.buf == "1234567x9abc012"
 
672
        self.interpret(f, [])
 
673
 
 
674
    def test_write_seek_beyond_end(self):
 
675
        "Linux behaviour. May be different on other platforms."
 
676
        base = TReaderWriter()
 
677
        filter = streamio.BufferingInputStream(
 
678
            streamio.BufferingOutputStream(base, 4), 4)
 
679
        def f():
 
680
            filter.seek(3, 0)
 
681
            filter.write("y"*2)
 
682
            filter.close()
 
683
            assert base.buf == "\0"*3 + "y"*2
 
684
        self.interpret(f, [])
 
685
 
 
686
class TestBufferingInputOutputStreamTests(
 
687
        BaseTestBufferingInputOutputStreamTests):
 
688
    def interpret(self, func, args):
 
689
        return func(*args)
 
690
 
 
691
class TestBufferingInputOutputStreamTestsLLinterp(
 
692
        BaseTestBufferingInputOutputStreamTests, LLRtypeMixin):
 
693
    pass
 
694
 
 
695
class TestBufferingInputOutputStreamTestsOOinterp(
 
696
        BaseTestBufferingInputOutputStreamTests, OORtypeMixin):
 
697
    pass
 
698
 
 
699
 
 
700
class BaseTestTextInputFilter(BaseRtypingTest):
 
701
 
 
702
    def _freeze_(self):
 
703
        return True
 
704
 
 
705
    packets = [
 
706
        "foo\r",
 
707
        "bar\r",
 
708
        "\nfoo\r\n",
 
709
        "abc\ndef\rghi\r\nxyz",
 
710
        "\nuvw\npqr\r",
 
711
        "\n",
 
712
        "abc\n",
 
713
        ]
 
714
    expected = [
 
715
        ("foo\n", 4),
 
716
        ("bar\n", 9),
 
717
        ("foo\n", 14),
 
718
        ("abc\ndef\nghi\nxyz", 30),
 
719
        ("\nuvw\npqr\n", 40),
 
720
        ("abc\n", 44),
 
721
        ("", 44),
 
722
        ("", 44),
 
723
        ]
 
724
 
 
725
    expected_with_tell = [
 
726
        ("foo\n", 4),
 
727
        ("b", 5),
 
728
        ("ar\n", 9),
 
729
        ("foo\n", 14),
 
730
        ("abc\ndef\nghi\nxyz", 30),
 
731
        ("\nuvw\npqr\n", 40),
 
732
        ("abc\n", 44),
 
733
        ("", 44),
 
734
        ("", 44),
 
735
        ]
 
736
 
 
737
    expected_newlines = [
 
738
        (["abcd"], [0]),
 
739
        (["abcd\n"], [2]),
 
740
        (["abcd\r\n"],[4]),
 
741
        (["abcd\r"],[0]), # wrong, but requires precognition to fix
 
742
        (["abcd\r", "\nefgh"], [0, 4]),
 
743
        (["abcd", "\nefg\r", "hij", "k\r\n"], [0, 2, 3, 7]),
 
744
        (["abcd", "\refg\r", "\nhij", "k\n"], [0, 1, 5, 7])
 
745
        ]
 
746
 
 
747
    def test_read(self):
 
748
        base = TReader(self.packets)
 
749
        filter = streamio.TextInputFilter(base)
 
750
        def f():
 
751
            for data, pos in self.expected:
 
752
                assert filter.read(100) == data
 
753
        self.interpret(f, [])
 
754
 
 
755
    def test_read_tell(self):
 
756
        base = TReader(self.packets)
 
757
        filter = streamio.TextInputFilter(base)
 
758
        def f():
 
759
            for data, pos in self.expected_with_tell:
 
760
                assert filter.read(100) == data
 
761
                assert filter.tell() == pos
 
762
                assert filter.tell() == pos # Repeat the tell() !
 
763
        self.interpret(f, [])
 
764
 
 
765
    def test_seek(self):
 
766
        base = TReader(self.packets)
 
767
        filter = streamio.TextInputFilter(base)
 
768
        def f():
 
769
            sofar = ""
 
770
            pairs = []
 
771
            while True:
 
772
                pairs.append((sofar, filter.tell()))
 
773
                c = filter.read(1)
 
774
                if not c:
 
775
                    break
 
776
                assert len(c) == 1
 
777
                sofar += c
 
778
            all = sofar
 
779
            for i in range(len(pairs)):
 
780
                sofar, pos = pairs[i]
 
781
                filter.seek(pos, 0)
 
782
                assert filter.tell() == pos
 
783
                assert filter.tell() == pos
 
784
                bufs = [sofar]
 
785
                while True:
 
786
                    data = filter.read(100)
 
787
                    if not data:
 
788
                        assert filter.read(100) == ""
 
789
                        break
 
790
                    bufs.append(data)
 
791
                assert "".join(bufs) == all
 
792
        self.interpret(f, [])
 
793
 
 
794
    def test_newlines_attribute(self):
 
795
        for packets, expected in self.expected_newlines:
 
796
            base = TReader(packets)
 
797
            filter = streamio.TextInputFilter(base)
 
798
            def f():
 
799
                for e in expected:
 
800
                    filter.read(100)
 
801
                    assert filter.getnewlines() == e
 
802
            self.interpret(f, [])
 
803
 
 
804
    
 
805
class TestTextInputFilter(BaseTestTextInputFilter):
 
806
    def interpret(self, func, args):
 
807
        return func(*args)
 
808
 
 
809
class TestTextInputFilterLLinterp(BaseTestTextInputFilter, LLRtypeMixin):
 
810
    pass
 
811
 
 
812
class TestTextInputFilterOOinterp(BaseTestTextInputFilter, OORtypeMixin):
 
813
    pass
 
814
 
 
815
 
 
816
class BaseTestTextOutputFilter(BaseRtypingTest):
 
817
 
 
818
    def test_write_nl(self):
 
819
        def f():
 
820
            base = TWriter()
 
821
            filter = streamio.TextOutputFilter(base, linesep="\n")
 
822
            filter.write("abc")
 
823
            filter.write("def\npqr\nuvw")
 
824
            filter.write("\n123\n")
 
825
            assert base.buf == "abcdef\npqr\nuvw\n123\n"
 
826
        self.interpret(f, [])
 
827
 
 
828
    def test_write_cr(self):
 
829
        def f():
 
830
            base = TWriter()
 
831
            filter = streamio.TextOutputFilter(base, linesep="\r")
 
832
            filter.write("abc")
 
833
            filter.write("def\npqr\nuvw")
 
834
            filter.write("\n123\n")
 
835
            assert base.buf == "abcdef\rpqr\ruvw\r123\r"
 
836
        self.interpret(f, [])
 
837
 
 
838
    def test_write_crnl(self):
 
839
        def f():
 
840
            base = TWriter()
 
841
            filter = streamio.TextOutputFilter(base, linesep="\r\n")
 
842
            filter.write("abc")
 
843
            filter.write("def\npqr\nuvw")
 
844
            filter.write("\n123\n")
 
845
            assert base.buf == "abcdef\r\npqr\r\nuvw\r\n123\r\n"
 
846
        self.interpret(f, [])
 
847
 
 
848
    def test_write_tell_nl(self):
 
849
        def f():
 
850
            base = TWriter()
 
851
            filter = streamio.TextOutputFilter(base, linesep="\n")
 
852
            filter.write("xxx")
 
853
            assert filter.tell() == 3
 
854
            filter.write("\nabc\n")
 
855
            assert filter.tell() == 8
 
856
        self.interpret(f, [])
 
857
 
 
858
    def test_write_tell_cr(self):
 
859
        def f():
 
860
            base = TWriter()
 
861
            filter = streamio.TextOutputFilter(base, linesep="\r")
 
862
            filter.write("xxx")
 
863
            assert filter.tell() == 3
 
864
            filter.write("\nabc\n")
 
865
            assert filter.tell() == 8
 
866
        self.interpret(f, [])
 
867
 
 
868
    def test_write_tell_crnl(self):
 
869
        def f():
 
870
            base = TWriter()
 
871
            filter = streamio.TextOutputFilter(base, linesep="\r\n")
 
872
            filter.write("xxx")
 
873
            assert filter.tell() == 3
 
874
            filter.write("\nabc\n")
 
875
            assert filter.tell() == 10
 
876
        self.interpret(f, [])
 
877
 
 
878
    def test_write_seek(self):
 
879
        def f():
 
880
            base = TWriter()
 
881
            filter = streamio.TextOutputFilter(base, linesep="\n")
 
882
            filter.write("x"*100)
 
883
            filter.seek(50, 0)
 
884
            filter.write("y"*10)
 
885
            assert base.buf == "x"*50 + "y"*10 + "x"*40
 
886
        self.interpret(f, [])
 
887
 
 
888
class TestTextOutputFilter(BaseTestTextOutputFilter):
 
889
    def interpret(self, func, args):
 
890
        return func(*args)
 
891
 
 
892
class TestTextOutputFilterLLinterp(BaseTestTextOutputFilter, LLRtypeMixin):
 
893
    pass
 
894
 
 
895
class TestTextOutputFilterOOinterp(BaseTestTextOutputFilter, OORtypeMixin):
 
896
    pass
 
897
 
 
898
 
 
899
class TestDecodingInputFilter:
 
900
 
 
901
    def test_read(self):
 
902
        chars = u"abc\xff\u1234\u4321\x80xyz"
 
903
        data = chars.encode("utf8")
 
904
        base = TReader([data])
 
905
        filter = streamio.DecodingInputFilter(base)
 
906
        bufs = []
 
907
        for n in range(1, 11):
 
908
            while 1:
 
909
                c = filter.read(n)
 
910
                assert type(c) == unicode
 
911
                if not c:
 
912
                    break
 
913
                bufs.append(c)
 
914
            assert u"".join(bufs) == chars
 
915
 
 
916
class TestEncodingOutputFilterTests: 
 
917
 
 
918
    def test_write(self):
 
919
        chars = u"abc\xff\u1234\u4321\x80xyz"
 
920
        data = chars.encode("utf8")
 
921
        for n in range(1, 11):
 
922
            base = TWriter()
 
923
            filter = streamio.EncodingOutputFilter(base)
 
924
            pos = 0
 
925
            while 1:
 
926
                c = chars[pos:pos+n]
 
927
                if not c:
 
928
                    break
 
929
                pos += len(c)
 
930
                filter.write(c)
 
931
            assert base.buf == data
 
932
 
 
933
 
 
934
 
 
935
# Speed test
 
936
 
 
937
FN = "BIG"
 
938
 
 
939
def timeit(fn=FN, opener=streamio.MMapFile):
 
940
    f = opener(fn, "r")
 
941
    lines = bytes = 0
 
942
    t0 = time.clock()
 
943
    for line in iter(f.readline, ""):
 
944
        lines += 1
 
945
        bytes += len(line)
 
946
    t1 = time.clock()
 
947
    print "%d lines (%d bytes) in %.3f seconds for %s" % (
 
948
        lines, bytes, t1-t0, opener.__name__)
 
949
 
 
950
def speed_main():
 
951
    def diskopen(fn, mode):
 
952
        filemode = 0
 
953
        import mmap
 
954
        if "r" in mode:
 
955
            filemode = os.O_RDONLY
 
956
        if "w" in mode:
 
957
            filemode |= os.O_WRONLY
 
958
        
 
959
        fd = os.open(fn, filemode)
 
960
        base = streamio.DiskFile(fd)
 
961
        return streamio.BufferingInputStream(base)
 
962
    def mmapopen(fn, mode):
 
963
        mmapmode = 0
 
964
        filemode = 0
 
965
        import mmap
 
966
        if "r" in mode:
 
967
            mmapmode = mmap.ACCESS_READ
 
968
            filemode = os.O_RDONLY
 
969
        if "w" in mode:
 
970
            mmapmode |= mmap.ACCESS_WRITE
 
971
            filemode |= os.O_WRONLY
 
972
        fd = os.open(fn, filemode)
 
973
        return streamio.MMapFile(fd, mmapmode)
 
974
    timeit(opener=diskopen)
 
975
    timeit(opener=mmapopen)
 
976
    timeit(opener=open)
 
977