~ubuntu-branches/ubuntu/maverick/python3.1/maverick

« back to all changes in this revision

Viewing changes to Lib/test/test_multibytecodec_support.py

  • Committer: Bazaar Package Importer
  • Author(s): Matthias Klose
  • Date: 2009-03-23 00:01:27 UTC
  • Revision ID: james.westby@ubuntu.com-20090323000127-5fstfxju4ufrhthq
Tags: upstream-3.1~a1+20090322
ImportĀ upstreamĀ versionĀ 3.1~a1+20090322

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#!/usr/bin/env python
 
2
#
 
3
# test_multibytecodec_support.py
 
4
#   Common Unittest Routines for CJK codecs
 
5
#
 
6
 
 
7
import sys, codecs
 
8
import unittest, re
 
9
from test import support
 
10
from io import BytesIO
 
11
 
 
12
class TestBase:
 
13
    encoding        = ''   # codec name
 
14
    codec           = None # codec tuple (with 4 elements)
 
15
    tstring         = None # must set. 2 strings to test StreamReader
 
16
 
 
17
    codectests      = None # must set. codec test tuple
 
18
    roundtriptest   = 1    # set if roundtrip is possible with unicode
 
19
    has_iso10646    = 0    # set if this encoding contains whole iso10646 map
 
20
    xmlcharnametest = None # string to test xmlcharrefreplace
 
21
    unmappedunicode = '\udeee' # a unicode codepoint that is not mapped.
 
22
 
 
23
    def setUp(self):
 
24
        if self.codec is None:
 
25
            self.codec = codecs.lookup(self.encoding)
 
26
        self.encode = self.codec.encode
 
27
        self.decode = self.codec.decode
 
28
        self.reader = self.codec.streamreader
 
29
        self.writer = self.codec.streamwriter
 
30
        self.incrementalencoder = self.codec.incrementalencoder
 
31
        self.incrementaldecoder = self.codec.incrementaldecoder
 
32
 
 
33
    def test_chunkcoding(self):
 
34
        tstring_lines = []
 
35
        for b in self.tstring:
 
36
            lines = b.split(b"\n")
 
37
            last = lines.pop()
 
38
            assert last == b""
 
39
            lines = [line + b"\n" for line in lines]
 
40
            tstring_lines.append(lines)
 
41
        for native, utf8 in zip(*tstring_lines):
 
42
            u = self.decode(native)[0]
 
43
            self.assertEqual(u, utf8.decode('utf-8'))
 
44
            if self.roundtriptest:
 
45
                self.assertEqual(native, self.encode(u)[0])
 
46
 
 
47
    def test_errorhandle(self):
 
48
        for source, scheme, expected in self.codectests:
 
49
            if isinstance(source, bytes):
 
50
                func = self.decode
 
51
            else:
 
52
                func = self.encode
 
53
            if expected:
 
54
                result = func(source, scheme)[0]
 
55
                if func is self.decode:
 
56
                    self.assert_(type(result) is str, type(result))
 
57
                else:
 
58
                    self.assert_(type(result) is bytes, type(result))
 
59
                self.assertEqual(result, expected)
 
60
            else:
 
61
                self.assertRaises(UnicodeError, func, source, scheme)
 
62
 
 
63
    def test_xmlcharrefreplace(self):
 
64
        if self.has_iso10646:
 
65
            return
 
66
 
 
67
        s = "\u0b13\u0b23\u0b60 nd eggs"
 
68
        self.assertEqual(
 
69
            self.encode(s, "xmlcharrefreplace")[0],
 
70
            b"ଓଣୠ nd eggs"
 
71
        )
 
72
 
 
73
    def test_customreplace_encode(self):
 
74
        if self.has_iso10646:
 
75
            return
 
76
 
 
77
        from html.entities import codepoint2name
 
78
 
 
79
        def xmlcharnamereplace(exc):
 
80
            if not isinstance(exc, UnicodeEncodeError):
 
81
                raise TypeError("don't know how to handle %r" % exc)
 
82
            l = []
 
83
            for c in exc.object[exc.start:exc.end]:
 
84
                if ord(c) in codepoint2name:
 
85
                    l.append("&%s;" % codepoint2name[ord(c)])
 
86
                else:
 
87
                    l.append("&#%d;" % ord(c))
 
88
            return ("".join(l), exc.end)
 
89
 
 
90
        codecs.register_error("test.xmlcharnamereplace", xmlcharnamereplace)
 
91
 
 
92
        if self.xmlcharnametest:
 
93
            sin, sout = self.xmlcharnametest
 
94
        else:
 
95
            sin = "\xab\u211c\xbb = \u2329\u1234\u232a"
 
96
            sout = b"«ℜ» = ⟨ሴ⟩"
 
97
        self.assertEqual(self.encode(sin,
 
98
                                    "test.xmlcharnamereplace")[0], sout)
 
99
 
 
100
    def test_callback_wrong_objects(self):
 
101
        def myreplace(exc):
 
102
            return (ret, exc.end)
 
103
        codecs.register_error("test.cjktest", myreplace)
 
104
 
 
105
        for ret in ([1, 2, 3], [], None, object(), b'string', b''):
 
106
            self.assertRaises(TypeError, self.encode, self.unmappedunicode,
 
107
                              'test.cjktest')
 
108
 
 
109
    def test_callback_long_index(self):
 
110
        def myreplace(exc):
 
111
            return ('x', int(exc.end))
 
112
        codecs.register_error("test.cjktest", myreplace)
 
113
        self.assertEqual(self.encode('abcd' + self.unmappedunicode + 'efgh',
 
114
                                     'test.cjktest'), (b'abcdxefgh', 9))
 
115
 
 
116
        def myreplace(exc):
 
117
            return ('x', sys.maxsize + 1)
 
118
        codecs.register_error("test.cjktest", myreplace)
 
119
        self.assertRaises(IndexError, self.encode, self.unmappedunicode,
 
120
                          'test.cjktest')
 
121
 
 
122
    def test_callback_None_index(self):
 
123
        def myreplace(exc):
 
124
            return ('x', None)
 
125
        codecs.register_error("test.cjktest", myreplace)
 
126
        self.assertRaises(TypeError, self.encode, self.unmappedunicode,
 
127
                          'test.cjktest')
 
128
 
 
129
    def test_callback_backward_index(self):
 
130
        def myreplace(exc):
 
131
            if myreplace.limit > 0:
 
132
                myreplace.limit -= 1
 
133
                return ('REPLACED', 0)
 
134
            else:
 
135
                return ('TERMINAL', exc.end)
 
136
        myreplace.limit = 3
 
137
        codecs.register_error("test.cjktest", myreplace)
 
138
        self.assertEqual(self.encode('abcd' + self.unmappedunicode + 'efgh',
 
139
                                     'test.cjktest'),
 
140
                (b'abcdREPLACEDabcdREPLACEDabcdREPLACEDabcdTERMINALefgh', 9))
 
141
 
 
142
    def test_callback_forward_index(self):
 
143
        def myreplace(exc):
 
144
            return ('REPLACED', exc.end + 2)
 
145
        codecs.register_error("test.cjktest", myreplace)
 
146
        self.assertEqual(self.encode('abcd' + self.unmappedunicode + 'efgh',
 
147
                                     'test.cjktest'), (b'abcdREPLACEDgh', 9))
 
148
 
 
149
    def test_callback_index_outofbound(self):
 
150
        def myreplace(exc):
 
151
            return ('TERM', 100)
 
152
        codecs.register_error("test.cjktest", myreplace)
 
153
        self.assertRaises(IndexError, self.encode, self.unmappedunicode,
 
154
                          'test.cjktest')
 
155
 
 
156
    def test_incrementalencoder(self):
 
157
        UTF8Reader = codecs.getreader('utf-8')
 
158
        for sizehint in [None] + list(range(1, 33)) + \
 
159
                        [64, 128, 256, 512, 1024]:
 
160
            istream = UTF8Reader(BytesIO(self.tstring[1]))
 
161
            ostream = BytesIO()
 
162
            encoder = self.incrementalencoder()
 
163
            while 1:
 
164
                if sizehint is not None:
 
165
                    data = istream.read(sizehint)
 
166
                else:
 
167
                    data = istream.read()
 
168
 
 
169
                if not data:
 
170
                    break
 
171
                e = encoder.encode(data)
 
172
                ostream.write(e)
 
173
 
 
174
            self.assertEqual(ostream.getvalue(), self.tstring[0])
 
175
 
 
176
    def test_incrementaldecoder(self):
 
177
        UTF8Writer = codecs.getwriter('utf-8')
 
178
        for sizehint in [None, -1] + list(range(1, 33)) + \
 
179
                        [64, 128, 256, 512, 1024]:
 
180
            istream = BytesIO(self.tstring[0])
 
181
            ostream = UTF8Writer(BytesIO())
 
182
            decoder = self.incrementaldecoder()
 
183
            while 1:
 
184
                data = istream.read(sizehint)
 
185
                if not data:
 
186
                    break
 
187
                else:
 
188
                    u = decoder.decode(data)
 
189
                    ostream.write(u)
 
190
 
 
191
            self.assertEqual(ostream.getvalue(), self.tstring[1])
 
192
 
 
193
    def test_incrementalencoder_error_callback(self):
 
194
        inv = self.unmappedunicode
 
195
 
 
196
        e = self.incrementalencoder()
 
197
        self.assertRaises(UnicodeEncodeError, e.encode, inv, True)
 
198
 
 
199
        e.errors = 'ignore'
 
200
        self.assertEqual(e.encode(inv, True), b'')
 
201
 
 
202
        e.reset()
 
203
        def tempreplace(exc):
 
204
            return ('called', exc.end)
 
205
        codecs.register_error('test.incremental_error_callback', tempreplace)
 
206
        e.errors = 'test.incremental_error_callback'
 
207
        self.assertEqual(e.encode(inv, True), b'called')
 
208
 
 
209
        # again
 
210
        e.errors = 'ignore'
 
211
        self.assertEqual(e.encode(inv, True), b'')
 
212
 
 
213
    def test_streamreader(self):
 
214
        UTF8Writer = codecs.getwriter('utf-8')
 
215
        for name in ["read", "readline", "readlines"]:
 
216
            for sizehint in [None, -1] + list(range(1, 33)) + \
 
217
                            [64, 128, 256, 512, 1024]:
 
218
                istream = self.reader(BytesIO(self.tstring[0]))
 
219
                ostream = UTF8Writer(BytesIO())
 
220
                func = getattr(istream, name)
 
221
                while 1:
 
222
                    data = func(sizehint)
 
223
                    if not data:
 
224
                        break
 
225
                    if name == "readlines":
 
226
                        ostream.writelines(data)
 
227
                    else:
 
228
                        ostream.write(data)
 
229
 
 
230
                self.assertEqual(ostream.getvalue(), self.tstring[1])
 
231
 
 
232
    def test_streamwriter(self):
 
233
        readfuncs = ('read', 'readline', 'readlines')
 
234
        UTF8Reader = codecs.getreader('utf-8')
 
235
        for name in readfuncs:
 
236
            for sizehint in [None] + list(range(1, 33)) + \
 
237
                            [64, 128, 256, 512, 1024]:
 
238
                istream = UTF8Reader(BytesIO(self.tstring[1]))
 
239
                ostream = self.writer(BytesIO())
 
240
                func = getattr(istream, name)
 
241
                while 1:
 
242
                    if sizehint is not None:
 
243
                        data = func(sizehint)
 
244
                    else:
 
245
                        data = func()
 
246
 
 
247
                    if not data:
 
248
                        break
 
249
                    if name == "readlines":
 
250
                        ostream.writelines(data)
 
251
                    else:
 
252
                        ostream.write(data)
 
253
 
 
254
                self.assertEqual(ostream.getvalue(), self.tstring[0])
 
255
 
 
256
if len('\U00012345') == 2: # ucs2 build
 
257
    _unichr = chr
 
258
    def chr(v):
 
259
        if v >= 0x10000:
 
260
            return _unichr(0xd800 + ((v - 0x10000) >> 10)) + \
 
261
                   _unichr(0xdc00 + ((v - 0x10000) & 0x3ff))
 
262
        else:
 
263
            return _unichr(v)
 
264
    _ord = ord
 
265
    def ord(c):
 
266
        if len(c) == 2:
 
267
            return 0x10000 + ((_ord(c[0]) - 0xd800) << 10) + \
 
268
                          (ord(c[1]) - 0xdc00)
 
269
        else:
 
270
            return _ord(c)
 
271
 
 
272
class TestBase_Mapping(unittest.TestCase):
 
273
    pass_enctest = []
 
274
    pass_dectest = []
 
275
    supmaps = []
 
276
 
 
277
    def __init__(self, *args, **kw):
 
278
        unittest.TestCase.__init__(self, *args, **kw)
 
279
        try:
 
280
            self.open_mapping_file() # test it to report the error early
 
281
        except IOError:
 
282
            raise support.TestSkipped("Could not retrieve "+self.mapfileurl)
 
283
 
 
284
    def open_mapping_file(self):
 
285
        return support.open_urlresource(self.mapfileurl)
 
286
 
 
287
    def test_mapping_file(self):
 
288
        if self.mapfileurl.endswith('.xml'):
 
289
            self._test_mapping_file_ucm()
 
290
        else:
 
291
            self._test_mapping_file_plain()
 
292
 
 
293
    def _test_mapping_file_plain(self):
 
294
        unichrs = lambda s: ''.join(map(chr, map(eval, s.split('+'))))
 
295
        urt_wa = {}
 
296
 
 
297
        for line in self.open_mapping_file():
 
298
            if not line:
 
299
                break
 
300
            data = line.split('#')[0].strip().split()
 
301
            if len(data) != 2:
 
302
                continue
 
303
 
 
304
            csetval = eval(data[0])
 
305
            if csetval <= 0x7F:
 
306
                csetch = bytes([csetval & 0xff])
 
307
            elif csetval >= 0x1000000:
 
308
                csetch = bytes([(csetval >> 24), ((csetval >> 16) & 0xff),
 
309
                                ((csetval >> 8) & 0xff), (csetval & 0xff)])
 
310
            elif csetval >= 0x10000:
 
311
                csetch = bytes([(csetval >> 16), ((csetval >> 8) & 0xff),
 
312
                                (csetval & 0xff)])
 
313
            elif csetval >= 0x100:
 
314
                csetch = bytes([(csetval >> 8), (csetval & 0xff)])
 
315
            else:
 
316
                continue
 
317
 
 
318
            unich = unichrs(data[1])
 
319
            if ord(unich) == 0xfffd or unich in urt_wa:
 
320
                continue
 
321
            urt_wa[unich] = csetch
 
322
 
 
323
            self._testpoint(csetch, unich)
 
324
 
 
325
    def _test_mapping_file_ucm(self):
 
326
        ucmdata = self.open_mapping_file().read()
 
327
        uc = re.findall('<a u="([A-F0-9]{4})" b="([0-9A-F ]+)"/>', ucmdata)
 
328
        for uni, coded in uc:
 
329
            unich = chr(int(uni, 16))
 
330
            codech = bytes(int(c, 16) for c in coded.split())
 
331
            self._testpoint(codech, unich)
 
332
 
 
333
    def test_mapping_supplemental(self):
 
334
        for mapping in self.supmaps:
 
335
            self._testpoint(*mapping)
 
336
 
 
337
    def _testpoint(self, csetch, unich):
 
338
        if (csetch, unich) not in self.pass_enctest:
 
339
            self.assertEqual(unich.encode(self.encoding), csetch)
 
340
        if (csetch, unich) not in self.pass_dectest:
 
341
            self.assertEqual(str(csetch, self.encoding), unich)
 
342
 
 
343
def load_teststring(encoding):
 
344
    from test import cjkencodings_test
 
345
    return cjkencodings_test.teststring[encoding]