~malept/ubuntu/lucid/python2.6/dev-dependency-fix

« back to all changes in this revision

Viewing changes to Lib/test/test_io.py

  • Committer: Bazaar Package Importer
  • Author(s): Matthias Klose
  • Date: 2009-02-13 12:51:00 UTC
  • Revision ID: james.westby@ubuntu.com-20090213125100-uufgcb9yeqzujpqw
Tags: upstream-2.6.1
ImportĀ upstreamĀ versionĀ 2.6.1

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
"""Unit tests for io.py."""
 
2
from __future__ import print_function
 
3
from __future__ import unicode_literals
 
4
 
 
5
import os
 
6
import sys
 
7
import time
 
8
import array
 
9
import threading
 
10
import random
 
11
import unittest
 
12
from itertools import chain, cycle
 
13
from test import test_support
 
14
 
 
15
import codecs
 
16
import io  # The module under test
 
17
 
 
18
 
 
19
class MockRawIO(io.RawIOBase):
 
20
 
 
21
    def __init__(self, read_stack=()):
 
22
        self._read_stack = list(read_stack)
 
23
        self._write_stack = []
 
24
 
 
25
    def read(self, n=None):
 
26
        try:
 
27
            return self._read_stack.pop(0)
 
28
        except:
 
29
            return b""
 
30
 
 
31
    def write(self, b):
 
32
        self._write_stack.append(b[:])
 
33
        return len(b)
 
34
 
 
35
    def writable(self):
 
36
        return True
 
37
 
 
38
    def fileno(self):
 
39
        return 42
 
40
 
 
41
    def readable(self):
 
42
        return True
 
43
 
 
44
    def seekable(self):
 
45
        return True
 
46
 
 
47
    def seek(self, pos, whence):
 
48
        pass
 
49
 
 
50
    def tell(self):
 
51
        return 42
 
52
 
 
53
 
 
54
class MockFileIO(io.BytesIO):
 
55
 
 
56
    def __init__(self, data):
 
57
        self.read_history = []
 
58
        io.BytesIO.__init__(self, data)
 
59
 
 
60
    def read(self, n=None):
 
61
        res = io.BytesIO.read(self, n)
 
62
        self.read_history.append(None if res is None else len(res))
 
63
        return res
 
64
 
 
65
 
 
66
class MockNonBlockWriterIO(io.RawIOBase):
 
67
 
 
68
    def __init__(self, blocking_script):
 
69
        self._blocking_script = list(blocking_script)
 
70
        self._write_stack = []
 
71
 
 
72
    def write(self, b):
 
73
        self._write_stack.append(b[:])
 
74
        n = self._blocking_script.pop(0)
 
75
        if (n < 0):
 
76
            raise io.BlockingIOError(0, "test blocking", -n)
 
77
        else:
 
78
            return n
 
79
 
 
80
    def writable(self):
 
81
        return True
 
82
 
 
83
 
 
84
class IOTest(unittest.TestCase):
 
85
 
 
86
    def tearDown(self):
 
87
        test_support.unlink(test_support.TESTFN)
 
88
 
 
89
    def write_ops(self, f):
 
90
        self.assertEqual(f.write(b"blah."), 5)
 
91
        self.assertEqual(f.seek(0), 0)
 
92
        self.assertEqual(f.write(b"Hello."), 6)
 
93
        self.assertEqual(f.tell(), 6)
 
94
        self.assertEqual(f.seek(-1, 1), 5)
 
95
        self.assertEqual(f.tell(), 5)
 
96
        self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
 
97
        self.assertEqual(f.seek(0), 0)
 
98
        self.assertEqual(f.write(b"h"), 1)
 
99
        self.assertEqual(f.seek(-1, 2), 13)
 
100
        self.assertEqual(f.tell(), 13)
 
101
        self.assertEqual(f.truncate(12), 12)
 
102
        self.assertEqual(f.tell(), 12)
 
103
        self.assertRaises(TypeError, f.seek, 0.0)
 
104
 
 
105
    def read_ops(self, f, buffered=False):
 
106
        data = f.read(5)
 
107
        self.assertEqual(data, b"hello")
 
108
        data = bytearray(data)
 
109
        self.assertEqual(f.readinto(data), 5)
 
110
        self.assertEqual(data, b" worl")
 
111
        self.assertEqual(f.readinto(data), 2)
 
112
        self.assertEqual(len(data), 5)
 
113
        self.assertEqual(data[:2], b"d\n")
 
114
        self.assertEqual(f.seek(0), 0)
 
115
        self.assertEqual(f.read(20), b"hello world\n")
 
116
        self.assertEqual(f.read(1), b"")
 
117
        self.assertEqual(f.readinto(bytearray(b"x")), 0)
 
118
        self.assertEqual(f.seek(-6, 2), 6)
 
119
        self.assertEqual(f.read(5), b"world")
 
120
        self.assertEqual(f.read(0), b"")
 
121
        self.assertEqual(f.readinto(bytearray()), 0)
 
122
        self.assertEqual(f.seek(-6, 1), 5)
 
123
        self.assertEqual(f.read(5), b" worl")
 
124
        self.assertEqual(f.tell(), 10)
 
125
        self.assertRaises(TypeError, f.seek, 0.0)
 
126
        if buffered:
 
127
            f.seek(0)
 
128
            self.assertEqual(f.read(), b"hello world\n")
 
129
            f.seek(6)
 
130
            self.assertEqual(f.read(), b"world\n")
 
131
            self.assertEqual(f.read(), b"")
 
132
 
 
133
    LARGE = 2**31
 
134
 
 
135
    def large_file_ops(self, f):
 
136
        assert f.readable()
 
137
        assert f.writable()
 
138
        self.assertEqual(f.seek(self.LARGE), self.LARGE)
 
139
        self.assertEqual(f.tell(), self.LARGE)
 
140
        self.assertEqual(f.write(b"xxx"), 3)
 
141
        self.assertEqual(f.tell(), self.LARGE + 3)
 
142
        self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
 
143
        self.assertEqual(f.truncate(), self.LARGE + 2)
 
144
        self.assertEqual(f.tell(), self.LARGE + 2)
 
145
        self.assertEqual(f.seek(0, 2), self.LARGE + 2)
 
146
        self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
 
147
        self.assertEqual(f.tell(), self.LARGE + 1)
 
148
        self.assertEqual(f.seek(0, 2), self.LARGE + 1)
 
149
        self.assertEqual(f.seek(-1, 2), self.LARGE)
 
150
        self.assertEqual(f.read(2), b"x")
 
151
 
 
152
    def test_raw_file_io(self):
 
153
        f = io.open(test_support.TESTFN, "wb", buffering=0)
 
154
        self.assertEqual(f.readable(), False)
 
155
        self.assertEqual(f.writable(), True)
 
156
        self.assertEqual(f.seekable(), True)
 
157
        self.write_ops(f)
 
158
        f.close()
 
159
        f = io.open(test_support.TESTFN, "rb", buffering=0)
 
160
        self.assertEqual(f.readable(), True)
 
161
        self.assertEqual(f.writable(), False)
 
162
        self.assertEqual(f.seekable(), True)
 
163
        self.read_ops(f)
 
164
        f.close()
 
165
 
 
166
    def test_buffered_file_io(self):
 
167
        f = io.open(test_support.TESTFN, "wb")
 
168
        self.assertEqual(f.readable(), False)
 
169
        self.assertEqual(f.writable(), True)
 
170
        self.assertEqual(f.seekable(), True)
 
171
        self.write_ops(f)
 
172
        f.close()
 
173
        f = io.open(test_support.TESTFN, "rb")
 
174
        self.assertEqual(f.readable(), True)
 
175
        self.assertEqual(f.writable(), False)
 
176
        self.assertEqual(f.seekable(), True)
 
177
        self.read_ops(f, True)
 
178
        f.close()
 
179
 
 
180
    def test_readline(self):
 
181
        f = io.open(test_support.TESTFN, "wb")
 
182
        f.write(b"abc\ndef\nxyzzy\nfoo")
 
183
        f.close()
 
184
        f = io.open(test_support.TESTFN, "rb")
 
185
        self.assertEqual(f.readline(), b"abc\n")
 
186
        self.assertEqual(f.readline(10), b"def\n")
 
187
        self.assertEqual(f.readline(2), b"xy")
 
188
        self.assertEqual(f.readline(4), b"zzy\n")
 
189
        self.assertEqual(f.readline(), b"foo")
 
190
        f.close()
 
191
 
 
192
    def test_raw_bytes_io(self):
 
193
        f = io.BytesIO()
 
194
        self.write_ops(f)
 
195
        data = f.getvalue()
 
196
        self.assertEqual(data, b"hello world\n")
 
197
        f = io.BytesIO(data)
 
198
        self.read_ops(f, True)
 
199
 
 
200
    def test_large_file_ops(self):
 
201
        # On Windows and Mac OSX this test comsumes large resources; It takes
 
202
        # a long time to build the >2GB file and takes >2GB of disk space
 
203
        # therefore the resource must be enabled to run this test.
 
204
        if sys.platform[:3] in ('win', 'os2') or sys.platform == 'darwin':
 
205
            if not test_support.is_resource_enabled("largefile"):
 
206
                print("\nTesting large file ops skipped on %s." % sys.platform,
 
207
                      file=sys.stderr)
 
208
                print("It requires %d bytes and a long time." % self.LARGE,
 
209
                      file=sys.stderr)
 
210
                print("Use 'regrtest.py -u largefile test_io' to run it.",
 
211
                      file=sys.stderr)
 
212
                return
 
213
        f = io.open(test_support.TESTFN, "w+b", 0)
 
214
        self.large_file_ops(f)
 
215
        f.close()
 
216
        f = io.open(test_support.TESTFN, "w+b")
 
217
        self.large_file_ops(f)
 
218
        f.close()
 
219
 
 
220
    def test_with_open(self):
 
221
        for bufsize in (0, 1, 100):
 
222
            f = None
 
223
            with open(test_support.TESTFN, "wb", bufsize) as f:
 
224
                f.write(b"xxx")
 
225
            self.assertEqual(f.closed, True)
 
226
            f = None
 
227
            try:
 
228
                with open(test_support.TESTFN, "wb", bufsize) as f:
 
229
                    1/0
 
230
            except ZeroDivisionError:
 
231
                self.assertEqual(f.closed, True)
 
232
            else:
 
233
                self.fail("1/0 didn't raise an exception")
 
234
 
 
235
    def test_destructor(self):
 
236
        record = []
 
237
        class MyFileIO(io.FileIO):
 
238
            def __del__(self):
 
239
                record.append(1)
 
240
                io.FileIO.__del__(self)
 
241
            def close(self):
 
242
                record.append(2)
 
243
                io.FileIO.close(self)
 
244
            def flush(self):
 
245
                record.append(3)
 
246
                io.FileIO.flush(self)
 
247
        f = MyFileIO(test_support.TESTFN, "w")
 
248
        f.write("xxx")
 
249
        del f
 
250
        self.assertEqual(record, [1, 2, 3])
 
251
 
 
252
    def test_close_flushes(self):
 
253
        f = io.open(test_support.TESTFN, "wb")
 
254
        f.write(b"xxx")
 
255
        f.close()
 
256
        f = io.open(test_support.TESTFN, "rb")
 
257
        self.assertEqual(f.read(), b"xxx")
 
258
        f.close()
 
259
 
 
260
    def XXXtest_array_writes(self):
 
261
        # XXX memory view not available yet
 
262
        a = array.array('i', range(10))
 
263
        n = len(memoryview(a))
 
264
        f = io.open(test_support.TESTFN, "wb", 0)
 
265
        self.assertEqual(f.write(a), n)
 
266
        f.close()
 
267
        f = io.open(test_support.TESTFN, "wb")
 
268
        self.assertEqual(f.write(a), n)
 
269
        f.close()
 
270
 
 
271
    def test_closefd(self):
 
272
        self.assertRaises(ValueError, io.open, test_support.TESTFN, 'w',
 
273
                          closefd=False)
 
274
 
 
275
class MemorySeekTestMixin:
 
276
 
 
277
    def testInit(self):
 
278
        buf = self.buftype("1234567890")
 
279
        bytesIo = self.ioclass(buf)
 
280
 
 
281
    def testRead(self):
 
282
        buf = self.buftype("1234567890")
 
283
        bytesIo = self.ioclass(buf)
 
284
 
 
285
        self.assertEquals(buf[:1], bytesIo.read(1))
 
286
        self.assertEquals(buf[1:5], bytesIo.read(4))
 
287
        self.assertEquals(buf[5:], bytesIo.read(900))
 
288
        self.assertEquals(self.EOF, bytesIo.read())
 
289
 
 
290
    def testReadNoArgs(self):
 
291
        buf = self.buftype("1234567890")
 
292
        bytesIo = self.ioclass(buf)
 
293
 
 
294
        self.assertEquals(buf, bytesIo.read())
 
295
        self.assertEquals(self.EOF, bytesIo.read())
 
296
 
 
297
    def testSeek(self):
 
298
        buf = self.buftype("1234567890")
 
299
        bytesIo = self.ioclass(buf)
 
300
 
 
301
        bytesIo.read(5)
 
302
        bytesIo.seek(0)
 
303
        self.assertEquals(buf, bytesIo.read())
 
304
 
 
305
        bytesIo.seek(3)
 
306
        self.assertEquals(buf[3:], bytesIo.read())
 
307
        self.assertRaises(TypeError, bytesIo.seek, 0.0)
 
308
 
 
309
    def testTell(self):
 
310
        buf = self.buftype("1234567890")
 
311
        bytesIo = self.ioclass(buf)
 
312
 
 
313
        self.assertEquals(0, bytesIo.tell())
 
314
        bytesIo.seek(5)
 
315
        self.assertEquals(5, bytesIo.tell())
 
316
        bytesIo.seek(10000)
 
317
        self.assertEquals(10000, bytesIo.tell())
 
318
 
 
319
 
 
320
class BytesIOTest(MemorySeekTestMixin, unittest.TestCase):
 
321
    @staticmethod
 
322
    def buftype(s):
 
323
        return s.encode("utf-8")
 
324
    ioclass = io.BytesIO
 
325
    EOF = b""
 
326
 
 
327
 
 
328
class StringIOTest(MemorySeekTestMixin, unittest.TestCase):
 
329
    buftype = str
 
330
    ioclass = io.StringIO
 
331
    EOF = ""
 
332
 
 
333
 
 
334
class BufferedReaderTest(unittest.TestCase):
 
335
 
 
336
    def testRead(self):
 
337
        rawio = MockRawIO((b"abc", b"d", b"efg"))
 
338
        bufio = io.BufferedReader(rawio)
 
339
 
 
340
        self.assertEquals(b"abcdef", bufio.read(6))
 
341
 
 
342
    def testBuffering(self):
 
343
        data = b"abcdefghi"
 
344
        dlen = len(data)
 
345
 
 
346
        tests = [
 
347
            [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
 
348
            [ 100, [ 3, 3, 3],     [ dlen ]    ],
 
349
            [   4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
 
350
        ]
 
351
 
 
352
        for bufsize, buf_read_sizes, raw_read_sizes in tests:
 
353
            rawio = MockFileIO(data)
 
354
            bufio = io.BufferedReader(rawio, buffer_size=bufsize)
 
355
            pos = 0
 
356
            for nbytes in buf_read_sizes:
 
357
                self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
 
358
                pos += nbytes
 
359
            self.assertEquals(rawio.read_history, raw_read_sizes)
 
360
 
 
361
    def testReadNonBlocking(self):
 
362
        # Inject some None's in there to simulate EWOULDBLOCK
 
363
        rawio = MockRawIO((b"abc", b"d", None, b"efg", None, None))
 
364
        bufio = io.BufferedReader(rawio)
 
365
 
 
366
        self.assertEquals(b"abcd", bufio.read(6))
 
367
        self.assertEquals(b"e", bufio.read(1))
 
368
        self.assertEquals(b"fg", bufio.read())
 
369
        self.assert_(None is bufio.read())
 
370
        self.assertEquals(b"", bufio.read())
 
371
 
 
372
    def testReadToEof(self):
 
373
        rawio = MockRawIO((b"abc", b"d", b"efg"))
 
374
        bufio = io.BufferedReader(rawio)
 
375
 
 
376
        self.assertEquals(b"abcdefg", bufio.read(9000))
 
377
 
 
378
    def testReadNoArgs(self):
 
379
        rawio = MockRawIO((b"abc", b"d", b"efg"))
 
380
        bufio = io.BufferedReader(rawio)
 
381
 
 
382
        self.assertEquals(b"abcdefg", bufio.read())
 
383
 
 
384
    def testFileno(self):
 
385
        rawio = MockRawIO((b"abc", b"d", b"efg"))
 
386
        bufio = io.BufferedReader(rawio)
 
387
 
 
388
        self.assertEquals(42, bufio.fileno())
 
389
 
 
390
    def testFilenoNoFileno(self):
 
391
        # XXX will we always have fileno() function? If so, kill
 
392
        # this test. Else, write it.
 
393
        pass
 
394
 
 
395
    def testThreads(self):
 
396
        try:
 
397
            # Write out many bytes with exactly the same number of 0's,
 
398
            # 1's... 255's. This will help us check that concurrent reading
 
399
            # doesn't duplicate or forget contents.
 
400
            N = 1000
 
401
            l = range(256) * N
 
402
            random.shuffle(l)
 
403
            s = bytes(bytearray(l))
 
404
            with io.open(test_support.TESTFN, "wb") as f:
 
405
                f.write(s)
 
406
            with io.open(test_support.TESTFN, "rb", buffering=0) as raw:
 
407
                bufio = io.BufferedReader(raw, 8)
 
408
                errors = []
 
409
                results = []
 
410
                def f():
 
411
                    try:
 
412
                        # Intra-buffer read then buffer-flushing read
 
413
                        for n in cycle([1, 19]):
 
414
                            s = bufio.read(n)
 
415
                            if not s:
 
416
                                break
 
417
                            # list.append() is atomic
 
418
                            results.append(s)
 
419
                    except Exception as e:
 
420
                        errors.append(e)
 
421
                        raise
 
422
                threads = [threading.Thread(target=f) for x in range(20)]
 
423
                for t in threads:
 
424
                    t.start()
 
425
                time.sleep(0.02) # yield
 
426
                for t in threads:
 
427
                    t.join()
 
428
                self.assertFalse(errors,
 
429
                    "the following exceptions were caught: %r" % errors)
 
430
                s = b''.join(results)
 
431
                for i in range(256):
 
432
                    c = bytes(bytearray([i]))
 
433
                    self.assertEqual(s.count(c), N)
 
434
        finally:
 
435
            test_support.unlink(test_support.TESTFN)
 
436
 
 
437
 
 
438
 
 
439
class BufferedWriterTest(unittest.TestCase):
 
440
 
 
441
    def testWrite(self):
 
442
        # Write to the buffered IO but don't overflow the buffer.
 
443
        writer = MockRawIO()
 
444
        bufio = io.BufferedWriter(writer, 8)
 
445
 
 
446
        bufio.write(b"abc")
 
447
 
 
448
        self.assertFalse(writer._write_stack)
 
449
 
 
450
    def testWriteOverflow(self):
 
451
        writer = MockRawIO()
 
452
        bufio = io.BufferedWriter(writer, 8)
 
453
 
 
454
        bufio.write(b"abc")
 
455
        bufio.write(b"defghijkl")
 
456
 
 
457
        self.assertEquals(b"abcdefghijkl", writer._write_stack[0])
 
458
 
 
459
    def testWriteNonBlocking(self):
 
460
        raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12))
 
461
        bufio = io.BufferedWriter(raw, 8, 16)
 
462
 
 
463
        bufio.write(b"asdf")
 
464
        bufio.write(b"asdfa")
 
465
        self.assertEquals(b"asdfasdfa", raw._write_stack[0])
 
466
 
 
467
        bufio.write(b"asdfasdfasdf")
 
468
        self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
 
469
        bufio.write(b"asdfasdfasdf")
 
470
        self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
 
471
        self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
 
472
 
 
473
        bufio.write(b"asdfasdfasdf")
 
474
 
 
475
        # XXX I don't like this test. It relies too heavily on how the
 
476
        # algorithm actually works, which we might change. Refactor
 
477
        # later.
 
478
 
 
479
    def testFileno(self):
 
480
        rawio = MockRawIO((b"abc", b"d", b"efg"))
 
481
        bufio = io.BufferedWriter(rawio)
 
482
 
 
483
        self.assertEquals(42, bufio.fileno())
 
484
 
 
485
    def testFlush(self):
 
486
        writer = MockRawIO()
 
487
        bufio = io.BufferedWriter(writer, 8)
 
488
 
 
489
        bufio.write(b"abc")
 
490
        bufio.flush()
 
491
 
 
492
        self.assertEquals(b"abc", writer._write_stack[0])
 
493
 
 
494
    def testThreads(self):
 
495
        # BufferedWriter should not raise exceptions or crash
 
496
        # when called from multiple threads.
 
497
        try:
 
498
            # We use a real file object because it allows us to
 
499
            # exercise situations where the GIL is released before
 
500
            # writing the buffer to the raw streams. This is in addition
 
501
            # to concurrency issues due to switching threads in the middle
 
502
            # of Python code.
 
503
            with io.open(test_support.TESTFN, "wb", buffering=0) as raw:
 
504
                bufio = io.BufferedWriter(raw, 8)
 
505
                errors = []
 
506
                def f():
 
507
                    try:
 
508
                        # Write enough bytes to flush the buffer
 
509
                        s = b"a" * 19
 
510
                        for i in range(50):
 
511
                            bufio.write(s)
 
512
                    except Exception as e:
 
513
                        errors.append(e)
 
514
                        raise
 
515
                threads = [threading.Thread(target=f) for x in range(20)]
 
516
                for t in threads:
 
517
                    t.start()
 
518
                time.sleep(0.02) # yield
 
519
                for t in threads:
 
520
                    t.join()
 
521
                self.assertFalse(errors,
 
522
                    "the following exceptions were caught: %r" % errors)
 
523
        finally:
 
524
            test_support.unlink(test_support.TESTFN)
 
525
 
 
526
 
 
527
class BufferedRWPairTest(unittest.TestCase):
 
528
 
 
529
    def testRWPair(self):
 
530
        r = MockRawIO(())
 
531
        w = MockRawIO()
 
532
        pair = io.BufferedRWPair(r, w)
 
533
 
 
534
        # XXX need implementation
 
535
 
 
536
 
 
537
class BufferedRandomTest(unittest.TestCase):
 
538
 
 
539
    def testReadAndWrite(self):
 
540
        raw = MockRawIO((b"asdf", b"ghjk"))
 
541
        rw = io.BufferedRandom(raw, 8, 12)
 
542
 
 
543
        self.assertEqual(b"as", rw.read(2))
 
544
        rw.write(b"ddd")
 
545
        rw.write(b"eee")
 
546
        self.assertFalse(raw._write_stack) # Buffer writes
 
547
        self.assertEqual(b"ghjk", rw.read()) # This read forces write flush
 
548
        self.assertEquals(b"dddeee", raw._write_stack[0])
 
549
 
 
550
    def testSeekAndTell(self):
 
551
        raw = io.BytesIO(b"asdfghjkl")
 
552
        rw = io.BufferedRandom(raw)
 
553
 
 
554
        self.assertEquals(b"as", rw.read(2))
 
555
        self.assertEquals(2, rw.tell())
 
556
        rw.seek(0, 0)
 
557
        self.assertEquals(b"asdf", rw.read(4))
 
558
 
 
559
        rw.write(b"asdf")
 
560
        rw.seek(0, 0)
 
561
        self.assertEquals(b"asdfasdfl", rw.read())
 
562
        self.assertEquals(9, rw.tell())
 
563
        rw.seek(-4, 2)
 
564
        self.assertEquals(5, rw.tell())
 
565
        rw.seek(2, 1)
 
566
        self.assertEquals(7, rw.tell())
 
567
        self.assertEquals(b"fl", rw.read(11))
 
568
        self.assertRaises(TypeError, rw.seek, 0.0)
 
569
 
 
570
# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
 
571
# properties:
 
572
#   - A single output character can correspond to many bytes of input.
 
573
#   - The number of input bytes to complete the character can be
 
574
#     undetermined until the last input byte is received.
 
575
#   - The number of input bytes can vary depending on previous input.
 
576
#   - A single input byte can correspond to many characters of output.
 
577
#   - The number of output characters can be undetermined until the
 
578
#     last input byte is received.
 
579
#   - The number of output characters can vary depending on previous input.
 
580
 
 
581
class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
 
582
    """
 
583
    For testing seek/tell behavior with a stateful, buffering decoder.
 
584
 
 
585
    Input is a sequence of words.  Words may be fixed-length (length set
 
586
    by input) or variable-length (period-terminated).  In variable-length
 
587
    mode, extra periods are ignored.  Possible words are:
 
588
      - 'i' followed by a number sets the input length, I (maximum 99).
 
589
        When I is set to 0, words are space-terminated.
 
590
      - 'o' followed by a number sets the output length, O (maximum 99).
 
591
      - Any other word is converted into a word followed by a period on
 
592
        the output.  The output word consists of the input word truncated
 
593
        or padded out with hyphens to make its length equal to O.  If O
 
594
        is 0, the word is output verbatim without truncating or padding.
 
595
    I and O are initially set to 1.  When I changes, any buffered input is
 
596
    re-scanned according to the new I.  EOF also terminates the last word.
 
597
    """
 
598
 
 
599
    def __init__(self, errors='strict'):
 
600
        codecs.IncrementalDecoder.__init__(self, errors)
 
601
        self.reset()
 
602
 
 
603
    def __repr__(self):
 
604
        return '<SID %x>' % id(self)
 
605
 
 
606
    def reset(self):
 
607
        self.i = 1
 
608
        self.o = 1
 
609
        self.buffer = bytearray()
 
610
 
 
611
    def getstate(self):
 
612
        i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
 
613
        return bytes(self.buffer), i*100 + o
 
614
 
 
615
    def setstate(self, state):
 
616
        buffer, io = state
 
617
        self.buffer = bytearray(buffer)
 
618
        i, o = divmod(io, 100)
 
619
        self.i, self.o = i ^ 1, o ^ 1
 
620
 
 
621
    def decode(self, input, final=False):
 
622
        output = ''
 
623
        for b in input:
 
624
            if self.i == 0: # variable-length, terminated with period
 
625
                if b == '.':
 
626
                    if self.buffer:
 
627
                        output += self.process_word()
 
628
                else:
 
629
                    self.buffer.append(b)
 
630
            else: # fixed-length, terminate after self.i bytes
 
631
                self.buffer.append(b)
 
632
                if len(self.buffer) == self.i:
 
633
                    output += self.process_word()
 
634
        if final and self.buffer: # EOF terminates the last word
 
635
            output += self.process_word()
 
636
        return output
 
637
 
 
638
    def process_word(self):
 
639
        output = ''
 
640
        if self.buffer[0] == ord('i'):
 
641
            self.i = min(99, int(self.buffer[1:] or 0)) # set input length
 
642
        elif self.buffer[0] == ord('o'):
 
643
            self.o = min(99, int(self.buffer[1:] or 0)) # set output length
 
644
        else:
 
645
            output = self.buffer.decode('ascii')
 
646
            if len(output) < self.o:
 
647
                output += '-'*self.o # pad out with hyphens
 
648
            if self.o:
 
649
                output = output[:self.o] # truncate to output length
 
650
            output += '.'
 
651
        self.buffer = bytearray()
 
652
        return output
 
653
 
 
654
    codecEnabled = False
 
655
 
 
656
    @classmethod
 
657
    def lookupTestDecoder(cls, name):
 
658
        if cls.codecEnabled and name == 'test_decoder':
 
659
            return codecs.CodecInfo(
 
660
                name='test_decoder', encode=None, decode=None,
 
661
                incrementalencoder=None,
 
662
                streamreader=None, streamwriter=None,
 
663
                incrementaldecoder=cls)
 
664
 
 
665
# Register the previous decoder for testing.
 
666
# Disabled by default, tests will enable it.
 
667
codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
 
668
 
 
669
 
 
670
class StatefulIncrementalDecoderTest(unittest.TestCase):
 
671
    """
 
672
    Make sure the StatefulIncrementalDecoder actually works.
 
673
    """
 
674
 
 
675
    test_cases = [
 
676
        # I=1, O=1 (fixed-length input == fixed-length output)
 
677
        (b'abcd', False, 'a.b.c.d.'),
 
678
        # I=0, O=0 (variable-length input, variable-length output)
 
679
        (b'oiabcd', True, 'abcd.'),
 
680
        # I=0, O=0 (should ignore extra periods)
 
681
        (b'oi...abcd...', True, 'abcd.'),
 
682
        # I=0, O=6 (variable-length input, fixed-length output)
 
683
        (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
 
684
        # I=2, O=6 (fixed-length input < fixed-length output)
 
685
        (b'i.i2.o6xyz', True, 'xy----.z-----.'),
 
686
        # I=6, O=3 (fixed-length input > fixed-length output)
 
687
        (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
 
688
        # I=0, then 3; O=29, then 15 (with longer output)
 
689
        (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
 
690
         'a----------------------------.' +
 
691
         'b----------------------------.' +
 
692
         'cde--------------------------.' +
 
693
         'abcdefghijabcde.' +
 
694
         'a.b------------.' +
 
695
         '.c.------------.' +
 
696
         'd.e------------.' +
 
697
         'k--------------.' +
 
698
         'l--------------.' +
 
699
         'm--------------.')
 
700
    ]
 
701
 
 
702
    def testDecoder(self):
 
703
        # Try a few one-shot test cases.
 
704
        for input, eof, output in self.test_cases:
 
705
            d = StatefulIncrementalDecoder()
 
706
            self.assertEquals(d.decode(input, eof), output)
 
707
 
 
708
        # Also test an unfinished decode, followed by forcing EOF.
 
709
        d = StatefulIncrementalDecoder()
 
710
        self.assertEquals(d.decode(b'oiabcd'), '')
 
711
        self.assertEquals(d.decode(b'', 1), 'abcd.')
 
712
 
 
713
class TextIOWrapperTest(unittest.TestCase):
 
714
 
 
715
    def setUp(self):
 
716
        self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
 
717
        self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
 
718
 
 
719
    def tearDown(self):
 
720
        test_support.unlink(test_support.TESTFN)
 
721
 
 
722
    def testLineBuffering(self):
 
723
        r = io.BytesIO()
 
724
        b = io.BufferedWriter(r, 1000)
 
725
        t = io.TextIOWrapper(b, newline="\n", line_buffering=True)
 
726
        t.write(u"X")
 
727
        self.assertEquals(r.getvalue(), b"")  # No flush happened
 
728
        t.write(u"Y\nZ")
 
729
        self.assertEquals(r.getvalue(), b"XY\nZ")  # All got flushed
 
730
        t.write(u"A\rB")
 
731
        self.assertEquals(r.getvalue(), b"XY\nZA\rB")
 
732
 
 
733
    def testEncodingErrorsReading(self):
 
734
        # (1) default
 
735
        b = io.BytesIO(b"abc\n\xff\n")
 
736
        t = io.TextIOWrapper(b, encoding="ascii")
 
737
        self.assertRaises(UnicodeError, t.read)
 
738
        # (2) explicit strict
 
739
        b = io.BytesIO(b"abc\n\xff\n")
 
740
        t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
 
741
        self.assertRaises(UnicodeError, t.read)
 
742
        # (3) ignore
 
743
        b = io.BytesIO(b"abc\n\xff\n")
 
744
        t = io.TextIOWrapper(b, encoding="ascii", errors="ignore")
 
745
        self.assertEquals(t.read(), "abc\n\n")
 
746
        # (4) replace
 
747
        b = io.BytesIO(b"abc\n\xff\n")
 
748
        t = io.TextIOWrapper(b, encoding="ascii", errors="replace")
 
749
        self.assertEquals(t.read(), u"abc\n\ufffd\n")
 
750
 
 
751
    def testEncodingErrorsWriting(self):
 
752
        # (1) default
 
753
        b = io.BytesIO()
 
754
        t = io.TextIOWrapper(b, encoding="ascii")
 
755
        self.assertRaises(UnicodeError, t.write, u"\xff")
 
756
        # (2) explicit strict
 
757
        b = io.BytesIO()
 
758
        t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
 
759
        self.assertRaises(UnicodeError, t.write, u"\xff")
 
760
        # (3) ignore
 
761
        b = io.BytesIO()
 
762
        t = io.TextIOWrapper(b, encoding="ascii", errors="ignore",
 
763
                             newline="\n")
 
764
        t.write(u"abc\xffdef\n")
 
765
        t.flush()
 
766
        self.assertEquals(b.getvalue(), b"abcdef\n")
 
767
        # (4) replace
 
768
        b = io.BytesIO()
 
769
        t = io.TextIOWrapper(b, encoding="ascii", errors="replace",
 
770
                             newline="\n")
 
771
        t.write(u"abc\xffdef\n")
 
772
        t.flush()
 
773
        self.assertEquals(b.getvalue(), b"abc?def\n")
 
774
 
 
775
    def testNewlinesInput(self):
 
776
        testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
 
777
        normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
 
778
        for newline, expected in [
 
779
            (None, normalized.decode("ascii").splitlines(True)),
 
780
            ("", testdata.decode("ascii").splitlines(True)),
 
781
            ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
 
782
            ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
 
783
            ("\r",  ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
 
784
            ]:
 
785
            buf = io.BytesIO(testdata)
 
786
            txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
 
787
            self.assertEquals(txt.readlines(), expected)
 
788
            txt.seek(0)
 
789
            self.assertEquals(txt.read(), "".join(expected))
 
790
 
 
791
    def testNewlinesOutput(self):
 
792
        testdict = {
 
793
            "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
 
794
            "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
 
795
            "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
 
796
            "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
 
797
            }
 
798
        tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
 
799
        for newline, expected in tests:
 
800
            buf = io.BytesIO()
 
801
            txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
 
802
            txt.write("AAA\nB")
 
803
            txt.write("BB\nCCC\n")
 
804
            txt.write("X\rY\r\nZ")
 
805
            txt.flush()
 
806
            self.assertEquals(buf.closed, False)
 
807
            self.assertEquals(buf.getvalue(), expected)
 
808
 
 
809
    def testNewlines(self):
 
810
        input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
 
811
 
 
812
        tests = [
 
813
            [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
 
814
            [ '', input_lines ],
 
815
            [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
 
816
            [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
 
817
            [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
 
818
        ]
 
819
 
 
820
        encodings = ('utf-8', 'latin-1')
 
821
 
 
822
        # Try a range of buffer sizes to test the case where \r is the last
 
823
        # character in TextIOWrapper._pending_line.
 
824
        for encoding in encodings:
 
825
            # XXX: str.encode() should return bytes
 
826
            data = bytes(''.join(input_lines).encode(encoding))
 
827
            for do_reads in (False, True):
 
828
                for bufsize in range(1, 10):
 
829
                    for newline, exp_lines in tests:
 
830
                        bufio = io.BufferedReader(io.BytesIO(data), bufsize)
 
831
                        textio = io.TextIOWrapper(bufio, newline=newline,
 
832
                                                  encoding=encoding)
 
833
                        if do_reads:
 
834
                            got_lines = []
 
835
                            while True:
 
836
                                c2 = textio.read(2)
 
837
                                if c2 == '':
 
838
                                    break
 
839
                                self.assertEquals(len(c2), 2)
 
840
                                got_lines.append(c2 + textio.readline())
 
841
                        else:
 
842
                            got_lines = list(textio)
 
843
 
 
844
                        for got_line, exp_line in zip(got_lines, exp_lines):
 
845
                            self.assertEquals(got_line, exp_line)
 
846
                        self.assertEquals(len(got_lines), len(exp_lines))
 
847
 
 
848
    def testNewlinesInput(self):
 
849
        testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
 
850
        normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
 
851
        for newline, expected in [
 
852
            (None, normalized.decode("ascii").splitlines(True)),
 
853
            ("", testdata.decode("ascii").splitlines(True)),
 
854
            ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
 
855
            ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
 
856
            ("\r",  ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
 
857
            ]:
 
858
            buf = io.BytesIO(testdata)
 
859
            txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
 
860
            self.assertEquals(txt.readlines(), expected)
 
861
            txt.seek(0)
 
862
            self.assertEquals(txt.read(), "".join(expected))
 
863
 
 
864
    def testNewlinesOutput(self):
 
865
        data = u"AAA\nBBB\rCCC\n"
 
866
        data_lf = b"AAA\nBBB\rCCC\n"
 
867
        data_cr = b"AAA\rBBB\rCCC\r"
 
868
        data_crlf = b"AAA\r\nBBB\rCCC\r\n"
 
869
        save_linesep = os.linesep
 
870
        try:
 
871
            for os.linesep, newline, expected in [
 
872
                ("\n", None, data_lf),
 
873
                ("\r\n", None, data_crlf),
 
874
                ("\n", "", data_lf),
 
875
                ("\r\n", "", data_lf),
 
876
                ("\n", "\n", data_lf),
 
877
                ("\r\n", "\n", data_lf),
 
878
                ("\n", "\r", data_cr),
 
879
                ("\r\n", "\r", data_cr),
 
880
                ("\n", "\r\n", data_crlf),
 
881
                ("\r\n", "\r\n", data_crlf),
 
882
                ]:
 
883
                buf = io.BytesIO()
 
884
                txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
 
885
                txt.write(data)
 
886
                txt.close()
 
887
                self.assertEquals(buf.closed, True)
 
888
                self.assertRaises(ValueError, buf.getvalue)
 
889
        finally:
 
890
            os.linesep = save_linesep
 
891
 
 
892
    # Systematic tests of the text I/O API
 
893
 
 
894
    def testBasicIO(self):
 
895
        for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
 
896
            for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
 
897
                f = io.open(test_support.TESTFN, "w+", encoding=enc)
 
898
                f._CHUNK_SIZE = chunksize
 
899
                self.assertEquals(f.write(u"abc"), 3)
 
900
                f.close()
 
901
                f = io.open(test_support.TESTFN, "r+", encoding=enc)
 
902
                f._CHUNK_SIZE = chunksize
 
903
                self.assertEquals(f.tell(), 0)
 
904
                self.assertEquals(f.read(), u"abc")
 
905
                cookie = f.tell()
 
906
                self.assertEquals(f.seek(0), 0)
 
907
                self.assertEquals(f.read(2), u"ab")
 
908
                self.assertEquals(f.read(1), u"c")
 
909
                self.assertEquals(f.read(1), u"")
 
910
                self.assertEquals(f.read(), u"")
 
911
                self.assertEquals(f.tell(), cookie)
 
912
                self.assertEquals(f.seek(0), 0)
 
913
                self.assertEquals(f.seek(0, 2), cookie)
 
914
                self.assertEquals(f.write(u"def"), 3)
 
915
                self.assertEquals(f.seek(cookie), cookie)
 
916
                self.assertEquals(f.read(), u"def")
 
917
                if enc.startswith("utf"):
 
918
                    self.multi_line_test(f, enc)
 
919
                f.close()
 
920
 
 
921
    def multi_line_test(self, f, enc):
 
922
        f.seek(0)
 
923
        f.truncate()
 
924
        sample = u"s\xff\u0fff\uffff"
 
925
        wlines = []
 
926
        for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
 
927
            chars = []
 
928
            for i in range(size):
 
929
                chars.append(sample[i % len(sample)])
 
930
            line = u"".join(chars) + u"\n"
 
931
            wlines.append((f.tell(), line))
 
932
            f.write(line)
 
933
        f.seek(0)
 
934
        rlines = []
 
935
        while True:
 
936
            pos = f.tell()
 
937
            line = f.readline()
 
938
            if not line:
 
939
                break
 
940
            rlines.append((pos, line))
 
941
        self.assertEquals(rlines, wlines)
 
942
 
 
943
    def testTelling(self):
 
944
        f = io.open(test_support.TESTFN, "w+", encoding="utf8")
 
945
        p0 = f.tell()
 
946
        f.write(u"\xff\n")
 
947
        p1 = f.tell()
 
948
        f.write(u"\xff\n")
 
949
        p2 = f.tell()
 
950
        f.seek(0)
 
951
        self.assertEquals(f.tell(), p0)
 
952
        self.assertEquals(f.readline(), u"\xff\n")
 
953
        self.assertEquals(f.tell(), p1)
 
954
        self.assertEquals(f.readline(), u"\xff\n")
 
955
        self.assertEquals(f.tell(), p2)
 
956
        f.seek(0)
 
957
        for line in f:
 
958
            self.assertEquals(line, u"\xff\n")
 
959
            self.assertRaises(IOError, f.tell)
 
960
        self.assertEquals(f.tell(), p2)
 
961
        f.close()
 
962
 
 
963
    def testSeeking(self):
 
964
        chunk_size = io.TextIOWrapper._CHUNK_SIZE
 
965
        prefix_size = chunk_size - 2
 
966
        u_prefix = "a" * prefix_size
 
967
        prefix = bytes(u_prefix.encode("utf-8"))
 
968
        self.assertEquals(len(u_prefix), len(prefix))
 
969
        u_suffix = "\u8888\n"
 
970
        suffix = bytes(u_suffix.encode("utf-8"))
 
971
        line = prefix + suffix
 
972
        f = io.open(test_support.TESTFN, "wb")
 
973
        f.write(line*2)
 
974
        f.close()
 
975
        f = io.open(test_support.TESTFN, "r", encoding="utf-8")
 
976
        s = f.read(prefix_size)
 
977
        self.assertEquals(s, unicode(prefix, "ascii"))
 
978
        self.assertEquals(f.tell(), prefix_size)
 
979
        self.assertEquals(f.readline(), u_suffix)
 
980
 
 
981
    def testSeekingToo(self):
 
982
        # Regression test for a specific bug
 
983
        data = b'\xe0\xbf\xbf\n'
 
984
        f = io.open(test_support.TESTFN, "wb")
 
985
        f.write(data)
 
986
        f.close()
 
987
        f = io.open(test_support.TESTFN, "r", encoding="utf-8")
 
988
        f._CHUNK_SIZE  # Just test that it exists
 
989
        f._CHUNK_SIZE = 2
 
990
        f.readline()
 
991
        f.tell()
 
992
 
 
993
    def testSeekAndTell(self):
 
994
        """Test seek/tell using the StatefulIncrementalDecoder."""
 
995
 
 
996
        def testSeekAndTellWithData(data, min_pos=0):
 
997
            """Tell/seek to various points within a data stream and ensure
 
998
            that the decoded data returned by read() is consistent."""
 
999
            f = io.open(test_support.TESTFN, 'wb')
 
1000
            f.write(data)
 
1001
            f.close()
 
1002
            f = io.open(test_support.TESTFN, encoding='test_decoder')
 
1003
            decoded = f.read()
 
1004
            f.close()
 
1005
 
 
1006
            for i in range(min_pos, len(decoded) + 1): # seek positions
 
1007
                for j in [1, 5, len(decoded) - i]: # read lengths
 
1008
                    f = io.open(test_support.TESTFN, encoding='test_decoder')
 
1009
                    self.assertEquals(f.read(i), decoded[:i])
 
1010
                    cookie = f.tell()
 
1011
                    self.assertEquals(f.read(j), decoded[i:i + j])
 
1012
                    f.seek(cookie)
 
1013
                    self.assertEquals(f.read(), decoded[i:])
 
1014
                    f.close()
 
1015
 
 
1016
        # Enable the test decoder.
 
1017
        StatefulIncrementalDecoder.codecEnabled = 1
 
1018
 
 
1019
        # Run the tests.
 
1020
        try:
 
1021
            # Try each test case.
 
1022
            for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
 
1023
                testSeekAndTellWithData(input)
 
1024
 
 
1025
            # Position each test case so that it crosses a chunk boundary.
 
1026
            CHUNK_SIZE = io.TextIOWrapper._CHUNK_SIZE
 
1027
            for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
 
1028
                offset = CHUNK_SIZE - len(input)//2
 
1029
                prefix = b'.'*offset
 
1030
                # Don't bother seeking into the prefix (takes too long).
 
1031
                min_pos = offset*2
 
1032
                testSeekAndTellWithData(prefix + input, min_pos)
 
1033
 
 
1034
        # Ensure our test decoder won't interfere with subsequent tests.
 
1035
        finally:
 
1036
            StatefulIncrementalDecoder.codecEnabled = 0
 
1037
 
 
1038
    def testEncodedWrites(self):
 
1039
        data = u"1234567890"
 
1040
        tests = ("utf-16",
 
1041
                 "utf-16-le",
 
1042
                 "utf-16-be",
 
1043
                 "utf-32",
 
1044
                 "utf-32-le",
 
1045
                 "utf-32-be")
 
1046
        for encoding in tests:
 
1047
            buf = io.BytesIO()
 
1048
            f = io.TextIOWrapper(buf, encoding=encoding)
 
1049
            # Check if the BOM is written only once (see issue1753).
 
1050
            f.write(data)
 
1051
            f.write(data)
 
1052
            f.seek(0)
 
1053
            self.assertEquals(f.read(), data * 2)
 
1054
            self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
 
1055
 
 
1056
    def timingTest(self):
 
1057
        timer = time.time
 
1058
        enc = "utf8"
 
1059
        line = "\0\x0f\xff\u0fff\uffff\U000fffff\U0010ffff"*3 + "\n"
 
1060
        nlines = 10000
 
1061
        nchars = len(line)
 
1062
        nbytes = len(line.encode(enc))
 
1063
        for chunk_size in (32, 64, 128, 256):
 
1064
            f = io.open(test_support.TESTFN, "w+", encoding=enc)
 
1065
            f._CHUNK_SIZE = chunk_size
 
1066
            t0 = timer()
 
1067
            for i in range(nlines):
 
1068
                f.write(line)
 
1069
            f.flush()
 
1070
            t1 = timer()
 
1071
            f.seek(0)
 
1072
            for line in f:
 
1073
                pass
 
1074
            t2 = timer()
 
1075
            f.seek(0)
 
1076
            while f.readline():
 
1077
                pass
 
1078
            t3 = timer()
 
1079
            f.seek(0)
 
1080
            while f.readline():
 
1081
                f.tell()
 
1082
            t4 = timer()
 
1083
            f.close()
 
1084
            if test_support.verbose:
 
1085
                print("\nTiming test: %d lines of %d characters (%d bytes)" %
 
1086
                      (nlines, nchars, nbytes))
 
1087
                print("File chunk size:          %6s" % f._CHUNK_SIZE)
 
1088
                print("Writing:                  %6.3f seconds" % (t1-t0))
 
1089
                print("Reading using iteration:  %6.3f seconds" % (t2-t1))
 
1090
                print("Reading using readline(): %6.3f seconds" % (t3-t2))
 
1091
                print("Using readline()+tell():  %6.3f seconds" % (t4-t3))
 
1092
 
 
1093
    def testReadOneByOne(self):
 
1094
        txt = io.TextIOWrapper(io.BytesIO(b"AA\r\nBB"))
 
1095
        reads = ""
 
1096
        while True:
 
1097
            c = txt.read(1)
 
1098
            if not c:
 
1099
                break
 
1100
            reads += c
 
1101
        self.assertEquals(reads, "AA\nBB")
 
1102
 
 
1103
    # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
 
1104
    def testReadByChunk(self):
 
1105
        # make sure "\r\n" straddles 128 char boundary.
 
1106
        txt = io.TextIOWrapper(io.BytesIO(b"A" * 127 + b"\r\nB"))
 
1107
        reads = ""
 
1108
        while True:
 
1109
            c = txt.read(128)
 
1110
            if not c:
 
1111
                break
 
1112
            reads += c
 
1113
        self.assertEquals(reads, "A"*127+"\nB")
 
1114
 
 
1115
    def test_issue1395_1(self):
 
1116
        txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
 
1117
 
 
1118
        # read one char at a time
 
1119
        reads = ""
 
1120
        while True:
 
1121
            c = txt.read(1)
 
1122
            if not c:
 
1123
                break
 
1124
            reads += c
 
1125
        self.assertEquals(reads, self.normalized)
 
1126
 
 
1127
    def test_issue1395_2(self):
 
1128
        txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
 
1129
        txt._CHUNK_SIZE = 4
 
1130
 
 
1131
        reads = ""
 
1132
        while True:
 
1133
            c = txt.read(4)
 
1134
            if not c:
 
1135
                break
 
1136
            reads += c
 
1137
        self.assertEquals(reads, self.normalized)
 
1138
 
 
1139
    def test_issue1395_3(self):
 
1140
        txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
 
1141
        txt._CHUNK_SIZE = 4
 
1142
 
 
1143
        reads = txt.read(4)
 
1144
        reads += txt.read(4)
 
1145
        reads += txt.readline()
 
1146
        reads += txt.readline()
 
1147
        reads += txt.readline()
 
1148
        self.assertEquals(reads, self.normalized)
 
1149
 
 
1150
    def test_issue1395_4(self):
 
1151
        txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
 
1152
        txt._CHUNK_SIZE = 4
 
1153
 
 
1154
        reads = txt.read(4)
 
1155
        reads += txt.read()
 
1156
        self.assertEquals(reads, self.normalized)
 
1157
 
 
1158
    def test_issue1395_5(self):
 
1159
        txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
 
1160
        txt._CHUNK_SIZE = 4
 
1161
 
 
1162
        reads = txt.read(4)
 
1163
        pos = txt.tell()
 
1164
        txt.seek(0)
 
1165
        txt.seek(pos)
 
1166
        self.assertEquals(txt.read(4), "BBB\n")
 
1167
 
 
1168
    def test_issue2282(self):
 
1169
        buffer = io.BytesIO(self.testdata)
 
1170
        txt = io.TextIOWrapper(buffer, encoding="ascii")
 
1171
 
 
1172
        self.assertEqual(buffer.seekable(), txt.seekable())
 
1173
 
 
1174
    def test_newline_decoder(self):
 
1175
        import codecs
 
1176
        decoder = codecs.getincrementaldecoder("utf-8")()
 
1177
        decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
 
1178
 
 
1179
        self.assertEquals(decoder.decode(b'\xe8\xa2\x88'), u"\u8888")
 
1180
 
 
1181
        self.assertEquals(decoder.decode(b'\xe8'), u"")
 
1182
        self.assertEquals(decoder.decode(b'\xa2'), u"")
 
1183
        self.assertEquals(decoder.decode(b'\x88'), u"\u8888")
 
1184
 
 
1185
        self.assertEquals(decoder.decode(b'\xe8'), u"")
 
1186
        self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
 
1187
 
 
1188
        decoder.setstate((b'', 0))
 
1189
        self.assertEquals(decoder.decode(b'\n'), u"\n")
 
1190
        self.assertEquals(decoder.decode(b'\r'), u"")
 
1191
        self.assertEquals(decoder.decode(b'', final=True), u"\n")
 
1192
        self.assertEquals(decoder.decode(b'\r', final=True), u"\n")
 
1193
 
 
1194
        self.assertEquals(decoder.decode(b'\r'), u"")
 
1195
        self.assertEquals(decoder.decode(b'a'), u"\na")
 
1196
 
 
1197
        self.assertEquals(decoder.decode(b'\r\r\n'), u"\n\n")
 
1198
        self.assertEquals(decoder.decode(b'\r'), u"")
 
1199
        self.assertEquals(decoder.decode(b'\r'), u"\n")
 
1200
        self.assertEquals(decoder.decode(b'\na'), u"\na")
 
1201
 
 
1202
        self.assertEquals(decoder.decode(b'\xe8\xa2\x88\r\n'), u"\u8888\n")
 
1203
        self.assertEquals(decoder.decode(b'\xe8\xa2\x88'), u"\u8888")
 
1204
        self.assertEquals(decoder.decode(b'\n'), u"\n")
 
1205
        self.assertEquals(decoder.decode(b'\xe8\xa2\x88\r'), u"\u8888")
 
1206
        self.assertEquals(decoder.decode(b'\n'), u"\n")
 
1207
 
 
1208
        decoder = codecs.getincrementaldecoder("utf-8")()
 
1209
        decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
 
1210
        self.assertEquals(decoder.newlines, None)
 
1211
        decoder.decode(b"abc\n\r")
 
1212
        self.assertEquals(decoder.newlines, u'\n')
 
1213
        decoder.decode(b"\nabc")
 
1214
        self.assertEquals(decoder.newlines, ('\n', '\r\n'))
 
1215
        decoder.decode(b"abc\r")
 
1216
        self.assertEquals(decoder.newlines, ('\n', '\r\n'))
 
1217
        decoder.decode(b"abc")
 
1218
        self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
 
1219
        decoder.decode(b"abc\r")
 
1220
        decoder.reset()
 
1221
        self.assertEquals(decoder.decode(b"abc"), "abc")
 
1222
        self.assertEquals(decoder.newlines, None)
 
1223
 
 
1224
# XXX Tests for open()
 
1225
 
 
1226
class MiscIOTest(unittest.TestCase):
 
1227
 
 
1228
    def testImport__all__(self):
 
1229
        for name in io.__all__:
 
1230
            obj = getattr(io, name, None)
 
1231
            self.assert_(obj is not None, name)
 
1232
            if name == "open":
 
1233
                continue
 
1234
            elif "error" in name.lower():
 
1235
                self.assert_(issubclass(obj, Exception), name)
 
1236
            else:
 
1237
                self.assert_(issubclass(obj, io.IOBase))
 
1238
 
 
1239
 
 
1240
def test_main():
 
1241
    test_support.run_unittest(IOTest, BytesIOTest, StringIOTest,
 
1242
                              BufferedReaderTest, BufferedWriterTest,
 
1243
                              BufferedRWPairTest, BufferedRandomTest,
 
1244
                              StatefulIncrementalDecoderTest,
 
1245
                              TextIOWrapperTest, MiscIOTest)
 
1246
 
 
1247
if __name__ == "__main__":
 
1248
    unittest.main()