~ubuntu-branches/ubuntu/karmic/pypy/karmic

« back to all changes in this revision

Viewing changes to pypy/module/bz2/test/test_bz2_file.py

  • Committer: Bazaar Package Importer
  • Author(s): Alexandre Fayolle
  • Date: 2007-04-13 09:33:09 UTC
  • Revision ID: james.westby@ubuntu.com-20070413093309-yoojh4jcoocu2krz
Tags: upstream-1.0.0
ImportĀ upstreamĀ versionĀ 1.0.0

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
import py
 
2
from pypy.conftest import gettestobjspace
 
3
import os
 
4
 
 
5
if os.name == "nt":
 
6
    from py.test import skip
 
7
    skip("bz2 module is not available on Windows")
 
8
        
 
9
def setup_module(mod):
 
10
    DATA = 'BZh91AY&SY.\xc8N\x18\x00\x01>_\x80\x00\x10@\x02\xff\xf0\x01\x07n\x00?\xe7\xff\xe00\x01\x99\xaa\x00\xc0\x03F\x86\x8c#&\x83F\x9a\x03\x06\xa6\xd0\xa6\x93M\x0fQ\xa7\xa8\x06\x804hh\x12$\x11\xa4i4\xf14S\xd2<Q\xb5\x0fH\xd3\xd4\xdd\xd5\x87\xbb\xf8\x94\r\x8f\xafI\x12\xe1\xc9\xf8/E\x00pu\x89\x12]\xc9\xbbDL\nQ\x0e\t1\x12\xdf\xa0\xc0\x97\xac2O9\x89\x13\x94\x0e\x1c7\x0ed\x95I\x0c\xaaJ\xa4\x18L\x10\x05#\x9c\xaf\xba\xbc/\x97\x8a#C\xc8\xe1\x8cW\xf9\xe2\xd0\xd6M\xa7\x8bXa<e\x84t\xcbL\xb3\xa7\xd9\xcd\xd1\xcb\x84.\xaf\xb3\xab\xab\xad`n}\xa0lh\tE,\x8eZ\x15\x17VH>\x88\xe5\xcd9gd6\x0b\n\xe9\x9b\xd5\x8a\x99\xf7\x08.K\x8ev\xfb\xf7xw\xbb\xdf\xa1\x92\xf1\xdd|/";\xa2\xba\x9f\xd5\xb1#A\xb6\xf6\xb3o\xc9\xc5y\\\xebO\xe7\x85\x9a\xbc\xb6f8\x952\xd5\xd7"%\x89>V,\xf7\xa6z\xe2\x9f\xa3\xdf\x11\x11"\xd6E)I\xa9\x13^\xca\xf3r\xd0\x03U\x922\xf26\xec\xb6\xed\x8b\xc3U\x13\x9d\xc5\x170\xa4\xfa^\x92\xacDF\x8a\x97\xd6\x19\xfe\xdd\xb8\xbd\x1a\x9a\x19\xa3\x80ankR\x8b\xe5\xd83]\xa9\xc6\x08\x82f\xf6\xb9"6l$\xb8j@\xc0\x8a\xb0l1..\xbak\x83ls\x15\xbc\xf4\xc1\x13\xbe\xf8E\xb8\x9d\r\xa8\x9dk\x84\xd3n\xfa\xacQ\x07\xb1%y\xaav\xb4\x08\xe0z\x1b\x16\xf5\x04\xe9\xcc\xb9\x08z\x1en7.G\xfc]\xc9\x14\xe1B@\xbb!8`'
 
11
    DATA_CRLF = 'BZh91AY&SY\xaez\xbbN\x00\x01H\xdf\x80\x00\x12@\x02\xff\xf0\x01\x07n\x00?\xe7\xff\xe0@\x01\xbc\xc6`\x86*\x8d=M\xa9\x9a\x86\xd0L@\x0fI\xa6!\xa1\x13\xc8\x88jdi\x8d@\x03@\x1a\x1a\x0c\x0c\x83 \x00\xc4h2\x19\x01\x82D\x84e\t\xe8\x99\x89\x19\x1ah\x00\r\x1a\x11\xaf\x9b\x0fG\xf5(\x1b\x1f?\t\x12\xcf\xb5\xfc\x95E\x00ps\x89\x12^\xa4\xdd\xa2&\x05(\x87\x04\x98\x89u\xe40%\xb6\x19\'\x8c\xc4\x89\xca\x07\x0e\x1b!\x91UIFU%C\x994!DI\xd2\xfa\xf0\xf1N8W\xde\x13A\xf5\x9cr%?\x9f3;I45A\xd1\x8bT\xb1<l\xba\xcb_\xc00xY\x17r\x17\x88\x08\x08@\xa0\ry@\x10\x04$)`\xf2\xce\x89z\xb0s\xec\x9b.iW\x9d\x81\xb5-+t\x9f\x1a\'\x97dB\xf5x\xb5\xbe.[.\xd7\x0e\x81\xe7\x08\x1cN`\x88\x10\xca\x87\xc3!"\x80\x92R\xa1/\xd1\xc0\xe6mf\xac\xbd\x99\xcca\xb3\x8780>\xa4\xc7\x8d\x1a\\"\xad\xa1\xabyBg\x15\xb9l\x88\x88\x91k"\x94\xa4\xd4\x89\xae*\xa6\x0b\x10\x0c\xd6\xd4m\xe86\xec\xb5j\x8a\x86j\';\xca.\x01I\xf2\xaaJ\xe8\x88\x8cU+t3\xfb\x0c\n\xa33\x13r2\r\x16\xe0\xb3(\xbf\x1d\x83r\xe7M\xf0D\x1365\xd8\x88\xd3\xa4\x92\xcb2\x06\x04\\\xc1\xb0\xea//\xbek&\xd8\xe6+t\xe5\xa1\x13\xada\x16\xder5"w]\xa2i\xb7[\x97R \xe2IT\xcd;Z\x04dk4\xad\x8a\t\xd3\x81z\x10\xf1:^`\xab\x1f\xc5\xdc\x91N\x14$+\x9e\xae\xd3\x80'
 
12
 
 
13
    def create_temp_file(crlf=False):
 
14
        f = py.test.ensuretemp("bz2").join("foo")
 
15
 
 
16
        data = (DATA, DATA_CRLF)[crlf]
 
17
        f.write(data)
 
18
    
 
19
    def decompress(data):
 
20
        import popen2
 
21
        import bz2
 
22
        pop = popen2.Popen3("bunzip2", capturestderr=1)
 
23
        pop.tochild.write(data)
 
24
        pop.tochild.close()
 
25
        res = pop.fromchild.read()
 
26
        pop.fromchild.close()
 
27
        if pop.wait() != 0:
 
28
            res = bz2.decompress(data)
 
29
        return res
 
30
 
 
31
    mod.TEXT = 'root:x:0:0:root:/root:/bin/bash\nbin:x:1:1:bin:/bin:\ndaemon:x:2:2:daemon:/sbin:\nadm:x:3:4:adm:/var/adm:\nlp:x:4:7:lp:/var/spool/lpd:\nsync:x:5:0:sync:/sbin:/bin/sync\nshutdown:x:6:0:shutdown:/sbin:/sbin/shutdown\nhalt:x:7:0:halt:/sbin:/sbin/halt\nmail:x:8:12:mail:/var/spool/mail:\nnews:x:9:13:news:/var/spool/news:\nuucp:x:10:14:uucp:/var/spool/uucp:\noperator:x:11:0:operator:/root:\ngames:x:12:100:games:/usr/games:\ngopher:x:13:30:gopher:/usr/lib/gopher-data:\nftp:x:14:50:FTP User:/var/ftp:/bin/bash\nnobody:x:65534:65534:Nobody:/home:\npostfix:x:100:101:postfix:/var/spool/postfix:\nniemeyer:x:500:500::/home/niemeyer:/bin/bash\npostgres:x:101:102:PostgreSQL Server:/var/lib/pgsql:/bin/bash\nmysql:x:102:103:MySQL server:/var/lib/mysql:/bin/bash\nwww:x:103:104::/var/www:/bin/false\n'
 
32
    mod.DATA = DATA
 
33
    mod.DATA_CRLF = DATA_CRLF 
 
34
    mod.create_temp_file = create_temp_file
 
35
    mod.decompress = decompress
 
36
 
 
37
class AppTestBZ2File:
 
38
    def setup_class(cls):
 
39
        space = gettestobjspace(usemodules=('bz2',))
 
40
        cls.space = space
 
41
        cls.w_TEXT = space.wrap(TEXT)
 
42
        cls.w_DATA = space.wrap(DATA)
 
43
        cls.w_DATA_CRLF = space.wrap(DATA_CRLF)
 
44
        cls.w_temppath = space.wrap(str(py.test.ensuretemp("bz2").join("foo")))
 
45
        cls.w_create_temp_file = space.wrap(create_temp_file)
 
46
        cls.w_decompress = space.wrap(decompress)
 
47
        
 
48
    def test_attributes(self):
 
49
        from bz2 import BZ2File
 
50
        
 
51
        bz2f = BZ2File(self.temppath, mode="w")
 
52
        assert bz2f.name == self.temppath
 
53
        assert bz2f.newlines == None
 
54
        assert bz2f.mode == "wb"
 
55
        assert bz2f.softspace == False
 
56
        assert bz2f.closed == False
 
57
        bz2f.close()
 
58
        assert bz2f.closed == True
 
59
    
 
60
    def test_creation(self):
 
61
        from bz2 import BZ2File
 
62
        
 
63
        raises(ValueError, BZ2File, self.temppath, mode='w', compresslevel=10)
 
64
        raises(IOError, BZ2File, self.temppath, mode='XYZ')
 
65
        # XXX the following is fine, currently:
 
66
        #raises(ValueError, BZ2File, self.temppath, mode='ww')
 
67
        
 
68
        BZ2File(self.temppath, mode='wU', buffering=0, compresslevel=8)
 
69
        BZ2File(self.temppath, mode='wb')
 
70
        # a large buf size
 
71
        BZ2File(self.temppath, mode='w', buffering=4096)
 
72
 
 
73
    def test_close(self):
 
74
        from bz2 import BZ2File
 
75
        
 
76
        # writeonly
 
77
        bz2f = BZ2File(self.temppath, mode='w')
 
78
        bz2f.close()
 
79
        # since we use fclose() internally you can't close it twice
 
80
        # bz2f.close()
 
81
        
 
82
        # readonly
 
83
        bz2f = BZ2File(self.temppath, mode='r')
 
84
        bz2f.close()
 
85
        
 
86
    def test_tell(self):
 
87
        from bz2 import BZ2File
 
88
        
 
89
        bz2f = BZ2File(self.temppath, mode='w')
 
90
        bz2f.close()
 
91
        raises(ValueError, bz2f.tell)
 
92
        
 
93
        bz2f = BZ2File(self.temppath, mode='w')
 
94
        pos = bz2f.tell()
 
95
        assert pos == 0
 
96
    
 
97
    def test_seek(self):
 
98
        from bz2 import BZ2File
 
99
        
 
100
        # hack to create a foo file
 
101
        open(self.temppath, "w").close()
 
102
        
 
103
        # cannot seek if close
 
104
        bz2f = BZ2File(self.temppath, mode='r')
 
105
        bz2f.close()
 
106
        raises(ValueError, bz2f.seek, 0)
 
107
        
 
108
        # cannot seek if 'w'
 
109
        bz2f = BZ2File(self.temppath, mode='w')
 
110
        raises(IOError, bz2f.seek, 0)
 
111
        bz2f.close()
 
112
        
 
113
        bz2f = BZ2File(self.temppath, mode='r')
 
114
        raises(TypeError, bz2f.seek)
 
115
        raises(TypeError, bz2f.seek, "foo")
 
116
        raises(TypeError, bz2f.seek, 0, "foo")
 
117
        
 
118
        bz2f.seek(0)
 
119
        assert bz2f.tell() == 0
 
120
 
 
121
    def test_open_close_del(self):
 
122
        from bz2 import BZ2File
 
123
        self.create_temp_file()
 
124
        
 
125
        for i in range(10):
 
126
            f = BZ2File(self.temppath)
 
127
            f.close()
 
128
            del f
 
129
    
 
130
    def test_open_non_existent(self):
 
131
        from bz2 import BZ2File
 
132
        raises(IOError, BZ2File, "/non/existent/path")
 
133
    
 
134
    def test_open_mode_U(self):
 
135
        # bug #1194181: bz2.BZ2File opened for write with mode "U"
 
136
        from bz2 import BZ2File
 
137
        self.create_temp_file()
 
138
        
 
139
        bz2f = BZ2File(self.temppath, "U")
 
140
        bz2f.close()
 
141
        f = open(self.temppath)
 
142
        f.seek(0, 2)
 
143
        f.read()
 
144
        assert f.tell() == len(self.DATA)
 
145
        f.close()
 
146
    
 
147
    def test_seek_forward(self):
 
148
        from bz2 import BZ2File
 
149
        self.create_temp_file()
 
150
 
 
151
        bz2f = BZ2File(self.temppath)
 
152
        bz2f.seek(150) # (150, 0)
 
153
        assert bz2f.read() == self.TEXT[150:]
 
154
        bz2f.close()
 
155
 
 
156
    def test_seek_backwards(self):
 
157
        #skip("currently does not work")
 
158
        from bz2 import BZ2File
 
159
        self.create_temp_file()
 
160
 
 
161
        bz2f = BZ2File(self.temppath)
 
162
        bz2f.read(500)
 
163
        bz2f.seek(-150, 1)
 
164
        assert bz2f.read() == self.TEXT[500 - 150:]
 
165
        bz2f.close()
 
166
 
 
167
    def test_seek_backwards_from_end(self):
 
168
        #skip("currently does not work")
 
169
        from bz2 import BZ2File
 
170
        self.create_temp_file()
 
171
 
 
172
        bz2f = BZ2File(self.temppath)
 
173
        bz2f.seek(-150, 2)
 
174
        assert bz2f.read() == self.TEXT[len(self.TEXT) - 150:]
 
175
        bz2f.close()
 
176
 
 
177
    def test_seek_post_end(self):
 
178
        from bz2 import BZ2File
 
179
        self.create_temp_file()
 
180
 
 
181
        bz2f = BZ2File(self.temppath)
 
182
        bz2f.seek(150000)
 
183
        assert bz2f.tell() == len(self.TEXT)
 
184
        assert bz2f.read() == ""
 
185
        bz2f.close()
 
186
    
 
187
    def test_seek_post_end_twice(self):
 
188
        from bz2 import BZ2File
 
189
        self.create_temp_file()
 
190
 
 
191
        bz2f = BZ2File(self.temppath)
 
192
        bz2f.seek(150000)
 
193
        bz2f.seek(150000)
 
194
        assert bz2f.tell() == len(self.TEXT)
 
195
        assert bz2f.read() == ""
 
196
        bz2f.close()
 
197
 
 
198
    def test_seek_pre_start(self):
 
199
        from bz2 import BZ2File
 
200
        self.create_temp_file()
 
201
 
 
202
        bz2f = BZ2File(self.temppath)
 
203
        bz2f.seek(-150)
 
204
        assert bz2f.tell() == 0
 
205
        assert bz2f.read() == self.TEXT
 
206
        bz2f.close()
 
207
 
 
208
    def test_readline(self):
 
209
        from bz2 import BZ2File
 
210
        from cStringIO import StringIO
 
211
        self.create_temp_file()
 
212
        
 
213
        bz2f = BZ2File(self.temppath)
 
214
        # XXX
 
215
        #raises(TypeError, bz2f.readline, None)
 
216
        sio = StringIO(self.TEXT)
 
217
        for line in sio.readlines():
 
218
            line_read = bz2f.readline()
 
219
            assert line_read == line
 
220
        bz2f.close()
 
221
 
 
222
    def test_read(self):
 
223
        from bz2 import BZ2File
 
224
        self.create_temp_file()
 
225
        
 
226
        bz2f = BZ2File(self.temppath)
 
227
        # XXX
 
228
        # raises(TypeError, bz2f.read, None)
 
229
        text_read = bz2f.read()
 
230
        assert text_read == self.TEXT
 
231
        bz2f.close()
 
232
 
 
233
    def test_read_chunk10(self):
 
234
        from bz2 import BZ2File
 
235
        self.create_temp_file()
 
236
        
 
237
        bz2f = BZ2File(self.temppath)
 
238
        text_read = ""
 
239
        while True:
 
240
            data = bz2f.read(10)
 
241
            if not data:
 
242
                break
 
243
            text_read = "%s%s" % (text_read, data)
 
244
        assert text_read == self.TEXT
 
245
        bz2f.close()
 
246
 
 
247
    def test_read_100_bytes(self):
 
248
        from bz2 import BZ2File
 
249
        self.create_temp_file()
 
250
        
 
251
        bz2f = BZ2File(self.temppath)
 
252
        assert bz2f.read(100) == self.TEXT[:100]
 
253
        bz2f.close()
 
254
 
 
255
    def test_universal_newlines_lf(self):
 
256
        from bz2 import BZ2File
 
257
        self.create_temp_file()
 
258
        
 
259
        bz2f = BZ2File(self.temppath, "rU")
 
260
        assert bz2f.read() == self.TEXT
 
261
        assert bz2f.newlines == "\n"
 
262
        bz2f.close()
 
263
 
 
264
    def test_universal_newlines_crlf(self):
 
265
        from bz2 import BZ2File
 
266
        self.create_temp_file(crlf=True)
 
267
        
 
268
        bz2f = BZ2File(self.temppath, "rU")
 
269
        data = bz2f.read()
 
270
        assert data == self.TEXT
 
271
        assert bz2f.newlines == "\r\n"
 
272
        bz2f.close()
 
273
 
 
274
    def test_readlines(self):
 
275
        from bz2 import BZ2File
 
276
        from cStringIO import StringIO
 
277
        self.create_temp_file()
 
278
        
 
279
        bz2f = BZ2File(self.temppath)
 
280
        # XXX
 
281
        #raises(TypeError, bz2f.readlines, None)
 
282
        sio = StringIO(self.TEXT)
 
283
        assert bz2f.readlines() == sio.readlines()
 
284
        bz2f.close()
 
285
 
 
286
    def test_iterator(self):
 
287
        from bz2 import BZ2File
 
288
        from cStringIO import StringIO
 
289
        self.create_temp_file()
 
290
        
 
291
        bz2f = BZ2File(self.temppath)
 
292
        sio = StringIO(self.TEXT)
 
293
        assert list(iter(bz2f)) == sio.readlines()
 
294
        bz2f.close()
 
295
        
 
296
    def test_xreadlines(self):
 
297
        from bz2 import BZ2File
 
298
        from cStringIO import StringIO
 
299
        self.create_temp_file()
 
300
        
 
301
        bz2f = BZ2File(self.temppath)
 
302
        sio = StringIO(self.TEXT)
 
303
        assert list(bz2f.xreadlines()) == sio.readlines()
 
304
        bz2f.close()
 
305
 
 
306
    def test_readlines_bug_1191043(self):
 
307
        # readlines()/xreadlines() for files containing no newline
 
308
        from bz2 import BZ2File
 
309
        
 
310
        DATA = 'BZh91AY&SY\xd9b\x89]\x00\x00\x00\x03\x80\x04\x00\x02\x00\x0c\x00 \x00!\x9ah3M\x13<]\xc9\x14\xe1BCe\x8a%t'
 
311
        f = open(self.temppath, "wb")
 
312
        f.write(DATA)
 
313
        f.close()
 
314
        
 
315
        bz2f = BZ2File(self.temppath)
 
316
        lines = bz2f.readlines()
 
317
        bz2f.close()
 
318
        assert lines == ['Test']
 
319
 
 
320
        bz2f = BZ2File(self.temppath)
 
321
        xlines = list(bz2f.xreadlines())
 
322
        bz2f.close()
 
323
        assert xlines == ['Test']
 
324
    
 
325
    def test_write(self):
 
326
        from bz2 import BZ2File
 
327
 
 
328
        bz2f = BZ2File(self.temppath, 'w')
 
329
        raises(TypeError, bz2f.write)
 
330
        bz2f.write(self.TEXT)
 
331
        bz2f.close()
 
332
        
 
333
        f = open(self.temppath, "rb")
 
334
        assert self.decompress(f.read()) == self.TEXT
 
335
        f.close()
 
336
 
 
337
    def test_write_chunks_10(self):
 
338
        from bz2 import BZ2File
 
339
 
 
340
        bz2f = BZ2File(self.temppath, 'w')
 
341
        n = 0
 
342
        while True:
 
343
            data = self.TEXT[n * 10:(n + 1) * 10]
 
344
            if not data:
 
345
                break
 
346
            
 
347
            bz2f.write(data)
 
348
            n += 1
 
349
        bz2f.close()
 
350
        
 
351
        f = open(self.temppath, "rb")
 
352
        assert self.decompress(f.read()) == self.TEXT
 
353
        f.close()
 
354
 
 
355
    def test_writelines(self):
 
356
        from bz2 import BZ2File
 
357
        from cStringIO import StringIO
 
358
 
 
359
        bz2f = BZ2File(self.temppath, 'w')
 
360
        raises(TypeError, bz2f.writelines)
 
361
        sio = StringIO(self.TEXT)
 
362
        bz2f.writelines(sio.readlines())
 
363
        bz2f.close()
 
364
        f = open(self.temppath, "rb")
 
365
        assert self.decompress(f.read()) == self.TEXT
 
366
        f.close()
 
367
        
 
368
    def test_write_methods_on_readonly_file(self):
 
369
        from bz2 import BZ2File
 
370
 
 
371
        bz2f = BZ2File(self.temppath, 'r')
 
372
        raises(IOError, bz2f.write, "abc")
 
373
        raises(IOError, bz2f.writelines, ["abc"])
 
374
        bz2f.close()
 
375
        
 
376
# has_cmdline_bunzip2 = sys.platform not in ("win32", "os2emx", "riscos")
 
377
 
378
# if has_cmdline_bunzip2:
 
379
#     def decompress(self, data):
 
380
#         pop = popen2.Popen3("bunzip2", capturestderr=1)
 
381
#         pop.tochild.write(data)
 
382
#         pop.tochild.close()
 
383
#         ret = pop.fromchild.read()
 
384
#         pop.fromchild.close()
 
385
#         if pop.wait() != 0:
 
386
#             ret = bz2.decompress(data)
 
387
#         return ret
 
388
 
389
# else:
 
390
#     # popen2.Popen3 doesn't exist on Windows, and even if it did, bunzip2
 
391
#     # isn't available to run.
 
392
#     def decompress(self, data):
 
393
#         return bz2.decompress(data)