~ubuntu-branches/ubuntu/trusty/python3.4/trusty-proposed

« back to all changes in this revision

Viewing changes to Lib/test/test_tarfile.py

  • Committer: Package Import Robot
  • Author(s): Matthias Klose
  • Date: 2013-11-25 09:44:27 UTC
  • Revision ID: package-import@ubuntu.com-20131125094427-lzxj8ap5w01lmo7f
Tags: upstream-3.4~b1
ImportĀ upstreamĀ versionĀ 3.4~b1

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
import sys
 
2
import os
 
3
import io
 
4
import shutil
 
5
from hashlib import md5
 
6
 
 
7
import unittest
 
8
import tarfile
 
9
 
 
10
from test import support, script_helper
 
11
 
 
12
# Check for our compression modules.
 
13
try:
 
14
    import gzip
 
15
except ImportError:
 
16
    gzip = None
 
17
try:
 
18
    import bz2
 
19
except ImportError:
 
20
    bz2 = None
 
21
try:
 
22
    import lzma
 
23
except ImportError:
 
24
    lzma = None
 
25
 
 
26
def md5sum(data):
 
27
    return md5(data).hexdigest()
 
28
 
 
29
TEMPDIR = os.path.abspath(support.TESTFN) + "-tardir"
 
30
tarextdir = TEMPDIR + '-extract-test'
 
31
tarname = support.findfile("testtar.tar")
 
32
gzipname = os.path.join(TEMPDIR, "testtar.tar.gz")
 
33
bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2")
 
34
xzname = os.path.join(TEMPDIR, "testtar.tar.xz")
 
35
tmpname = os.path.join(TEMPDIR, "tmp.tar")
 
36
dotlessname = os.path.join(TEMPDIR, "testtar")
 
37
 
 
38
md5_regtype = "65f477c818ad9e15f7feab0c6d37742f"
 
39
md5_sparse = "a54fbc4ca4f4399a90e1b27164012fc6"
 
40
 
 
41
 
 
42
class TarTest:
 
43
    tarname = tarname
 
44
    suffix = ''
 
45
    open = io.FileIO
 
46
 
 
47
    @property
 
48
    def mode(self):
 
49
        return self.prefix + self.suffix
 
50
 
 
51
@support.requires_gzip
 
52
class GzipTest:
 
53
    tarname = gzipname
 
54
    suffix = 'gz'
 
55
    open = gzip.GzipFile if gzip else None
 
56
 
 
57
@support.requires_bz2
 
58
class Bz2Test:
 
59
    tarname = bz2name
 
60
    suffix = 'bz2'
 
61
    open = bz2.BZ2File if bz2 else None
 
62
 
 
63
@support.requires_lzma
 
64
class LzmaTest:
 
65
    tarname = xzname
 
66
    suffix = 'xz'
 
67
    open = lzma.LZMAFile if lzma else None
 
68
 
 
69
 
 
70
class ReadTest(TarTest):
 
71
 
 
72
    prefix = "r:"
 
73
 
 
74
    def setUp(self):
 
75
        self.tar = tarfile.open(self.tarname, mode=self.mode,
 
76
                                encoding="iso8859-1")
 
77
 
 
78
    def tearDown(self):
 
79
        self.tar.close()
 
80
 
 
81
 
 
82
class UstarReadTest(ReadTest, unittest.TestCase):
 
83
 
 
84
    def test_fileobj_regular_file(self):
 
85
        tarinfo = self.tar.getmember("ustar/regtype")
 
86
        with self.tar.extractfile(tarinfo) as fobj:
 
87
            data = fobj.read()
 
88
            self.assertEqual(len(data), tarinfo.size,
 
89
                    "regular file extraction failed")
 
90
            self.assertEqual(md5sum(data), md5_regtype,
 
91
                    "regular file extraction failed")
 
92
 
 
93
    def test_fileobj_readlines(self):
 
94
        self.tar.extract("ustar/regtype", TEMPDIR)
 
95
        tarinfo = self.tar.getmember("ustar/regtype")
 
96
        with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1:
 
97
            lines1 = fobj1.readlines()
 
98
 
 
99
        with self.tar.extractfile(tarinfo) as fobj:
 
100
            fobj2 = io.TextIOWrapper(fobj)
 
101
            lines2 = fobj2.readlines()
 
102
            self.assertEqual(lines1, lines2,
 
103
                    "fileobj.readlines() failed")
 
104
            self.assertEqual(len(lines2), 114,
 
105
                    "fileobj.readlines() failed")
 
106
            self.assertEqual(lines2[83],
 
107
                    "I will gladly admit that Python is not the fastest "
 
108
                    "running scripting language.\n",
 
109
                    "fileobj.readlines() failed")
 
110
 
 
111
    def test_fileobj_iter(self):
 
112
        self.tar.extract("ustar/regtype", TEMPDIR)
 
113
        tarinfo = self.tar.getmember("ustar/regtype")
 
114
        with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1:
 
115
            lines1 = fobj1.readlines()
 
116
        with self.tar.extractfile(tarinfo) as fobj2:
 
117
            lines2 = list(io.TextIOWrapper(fobj2))
 
118
            self.assertEqual(lines1, lines2,
 
119
                    "fileobj.__iter__() failed")
 
120
 
 
121
    def test_fileobj_seek(self):
 
122
        self.tar.extract("ustar/regtype", TEMPDIR)
 
123
        with open(os.path.join(TEMPDIR, "ustar/regtype"), "rb") as fobj:
 
124
            data = fobj.read()
 
125
 
 
126
        tarinfo = self.tar.getmember("ustar/regtype")
 
127
        fobj = self.tar.extractfile(tarinfo)
 
128
 
 
129
        text = fobj.read()
 
130
        fobj.seek(0)
 
131
        self.assertEqual(0, fobj.tell(),
 
132
                     "seek() to file's start failed")
 
133
        fobj.seek(2048, 0)
 
134
        self.assertEqual(2048, fobj.tell(),
 
135
                     "seek() to absolute position failed")
 
136
        fobj.seek(-1024, 1)
 
137
        self.assertEqual(1024, fobj.tell(),
 
138
                     "seek() to negative relative position failed")
 
139
        fobj.seek(1024, 1)
 
140
        self.assertEqual(2048, fobj.tell(),
 
141
                     "seek() to positive relative position failed")
 
142
        s = fobj.read(10)
 
143
        self.assertEqual(s, data[2048:2058],
 
144
                     "read() after seek failed")
 
145
        fobj.seek(0, 2)
 
146
        self.assertEqual(tarinfo.size, fobj.tell(),
 
147
                     "seek() to file's end failed")
 
148
        self.assertEqual(fobj.read(), b"",
 
149
                     "read() at file's end did not return empty string")
 
150
        fobj.seek(-tarinfo.size, 2)
 
151
        self.assertEqual(0, fobj.tell(),
 
152
                     "relative seek() to file's end failed")
 
153
        fobj.seek(512)
 
154
        s1 = fobj.readlines()
 
155
        fobj.seek(512)
 
156
        s2 = fobj.readlines()
 
157
        self.assertEqual(s1, s2,
 
158
                     "readlines() after seek failed")
 
159
        fobj.seek(0)
 
160
        self.assertEqual(len(fobj.readline()), fobj.tell(),
 
161
                     "tell() after readline() failed")
 
162
        fobj.seek(512)
 
163
        self.assertEqual(len(fobj.readline()) + 512, fobj.tell(),
 
164
                     "tell() after seek() and readline() failed")
 
165
        fobj.seek(0)
 
166
        line = fobj.readline()
 
167
        self.assertEqual(fobj.read(), data[len(line):],
 
168
                     "read() after readline() failed")
 
169
        fobj.close()
 
170
 
 
171
    def test_fileobj_text(self):
 
172
        with self.tar.extractfile("ustar/regtype") as fobj:
 
173
            fobj = io.TextIOWrapper(fobj)
 
174
            data = fobj.read().encode("iso8859-1")
 
175
            self.assertEqual(md5sum(data), md5_regtype)
 
176
            try:
 
177
                fobj.seek(100)
 
178
            except AttributeError:
 
179
                # Issue #13815: seek() complained about a missing
 
180
                # flush() method.
 
181
                self.fail("seeking failed in text mode")
 
182
 
 
183
    # Test if symbolic and hard links are resolved by extractfile().  The
 
184
    # test link members each point to a regular member whose data is
 
185
    # supposed to be exported.
 
186
    def _test_fileobj_link(self, lnktype, regtype):
 
187
        with self.tar.extractfile(lnktype) as a, \
 
188
             self.tar.extractfile(regtype) as b:
 
189
            self.assertEqual(a.name, b.name)
 
190
 
 
191
    def test_fileobj_link1(self):
 
192
        self._test_fileobj_link("ustar/lnktype", "ustar/regtype")
 
193
 
 
194
    def test_fileobj_link2(self):
 
195
        self._test_fileobj_link("./ustar/linktest2/lnktype",
 
196
                                "ustar/linktest1/regtype")
 
197
 
 
198
    def test_fileobj_symlink1(self):
 
199
        self._test_fileobj_link("ustar/symtype", "ustar/regtype")
 
200
 
 
201
    def test_fileobj_symlink2(self):
 
202
        self._test_fileobj_link("./ustar/linktest2/symtype",
 
203
                                "ustar/linktest1/regtype")
 
204
 
 
205
    def test_issue14160(self):
 
206
        self._test_fileobj_link("symtype2", "ustar/regtype")
 
207
 
 
208
class GzipUstarReadTest(GzipTest, UstarReadTest):
 
209
    pass
 
210
 
 
211
class Bz2UstarReadTest(Bz2Test, UstarReadTest):
 
212
    pass
 
213
 
 
214
class LzmaUstarReadTest(LzmaTest, UstarReadTest):
 
215
    pass
 
216
 
 
217
 
 
218
class CommonReadTest(ReadTest):
 
219
 
 
220
    def test_empty_tarfile(self):
 
221
        # Test for issue6123: Allow opening empty archives.
 
222
        # This test checks if tarfile.open() is able to open an empty tar
 
223
        # archive successfully. Note that an empty tar archive is not the
 
224
        # same as an empty file!
 
225
        with tarfile.open(tmpname, self.mode.replace("r", "w")):
 
226
            pass
 
227
        try:
 
228
            tar = tarfile.open(tmpname, self.mode)
 
229
            tar.getnames()
 
230
        except tarfile.ReadError:
 
231
            self.fail("tarfile.open() failed on empty archive")
 
232
        else:
 
233
            self.assertListEqual(tar.getmembers(), [])
 
234
        finally:
 
235
            tar.close()
 
236
 
 
237
    def test_null_tarfile(self):
 
238
        # Test for issue6123: Allow opening empty archives.
 
239
        # This test guarantees that tarfile.open() does not treat an empty
 
240
        # file as an empty tar archive.
 
241
        with open(tmpname, "wb"):
 
242
            pass
 
243
        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode)
 
244
        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname)
 
245
 
 
246
    def test_ignore_zeros(self):
 
247
        # Test TarFile's ignore_zeros option.
 
248
        for char in (b'\0', b'a'):
 
249
            # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a')
 
250
            # are ignored correctly.
 
251
            with self.open(tmpname, "w") as fobj:
 
252
                fobj.write(char * 1024)
 
253
                fobj.write(tarfile.TarInfo("foo").tobuf())
 
254
 
 
255
            tar = tarfile.open(tmpname, mode="r", ignore_zeros=True)
 
256
            try:
 
257
                self.assertListEqual(tar.getnames(), ["foo"],
 
258
                    "ignore_zeros=True should have skipped the %r-blocks" %
 
259
                    char)
 
260
            finally:
 
261
                tar.close()
 
262
 
 
263
 
 
264
class MiscReadTestBase(CommonReadTest):
 
265
    def test_no_name_argument(self):
 
266
        with open(self.tarname, "rb") as fobj:
 
267
            tar = tarfile.open(fileobj=fobj, mode=self.mode)
 
268
            self.assertEqual(tar.name, os.path.abspath(fobj.name))
 
269
 
 
270
    def test_no_name_attribute(self):
 
271
        with open(self.tarname, "rb") as fobj:
 
272
            data = fobj.read()
 
273
        fobj = io.BytesIO(data)
 
274
        self.assertRaises(AttributeError, getattr, fobj, "name")
 
275
        tar = tarfile.open(fileobj=fobj, mode=self.mode)
 
276
        self.assertEqual(tar.name, None)
 
277
 
 
278
    def test_empty_name_attribute(self):
 
279
        with open(self.tarname, "rb") as fobj:
 
280
            data = fobj.read()
 
281
        fobj = io.BytesIO(data)
 
282
        fobj.name = ""
 
283
        with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
 
284
            self.assertEqual(tar.name, None)
 
285
 
 
286
    def test_fileobj_with_offset(self):
 
287
        # Skip the first member and store values from the second member
 
288
        # of the testtar.
 
289
        tar = tarfile.open(self.tarname, mode=self.mode)
 
290
        try:
 
291
            tar.next()
 
292
            t = tar.next()
 
293
            name = t.name
 
294
            offset = t.offset
 
295
            with tar.extractfile(t) as f:
 
296
                data = f.read()
 
297
        finally:
 
298
            tar.close()
 
299
 
 
300
        # Open the testtar and seek to the offset of the second member.
 
301
        with self.open(self.tarname) as fobj:
 
302
            fobj.seek(offset)
 
303
 
 
304
            # Test if the tarfile starts with the second member.
 
305
            tar = tar.open(self.tarname, mode="r:", fileobj=fobj)
 
306
            t = tar.next()
 
307
            self.assertEqual(t.name, name)
 
308
            # Read to the end of fileobj and test if seeking back to the
 
309
            # beginning works.
 
310
            tar.getmembers()
 
311
            self.assertEqual(tar.extractfile(t).read(), data,
 
312
                    "seek back did not work")
 
313
            tar.close()
 
314
 
 
315
    def test_fail_comp(self):
 
316
        # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file.
 
317
        self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode)
 
318
        with open(tarname, "rb") as fobj:
 
319
            self.assertRaises(tarfile.ReadError, tarfile.open,
 
320
                              fileobj=fobj, mode=self.mode)
 
321
 
 
322
    def test_v7_dirtype(self):
 
323
        # Test old style dirtype member (bug #1336623):
 
324
        # Old V7 tars create directory members using an AREGTYPE
 
325
        # header with a "/" appended to the filename field.
 
326
        tarinfo = self.tar.getmember("misc/dirtype-old-v7")
 
327
        self.assertEqual(tarinfo.type, tarfile.DIRTYPE,
 
328
                "v7 dirtype failed")
 
329
 
 
330
    def test_xstar_type(self):
 
331
        # The xstar format stores extra atime and ctime fields inside the
 
332
        # space reserved for the prefix field. The prefix field must be
 
333
        # ignored in this case, otherwise it will mess up the name.
 
334
        try:
 
335
            self.tar.getmember("misc/regtype-xstar")
 
336
        except KeyError:
 
337
            self.fail("failed to find misc/regtype-xstar (mangled prefix?)")
 
338
 
 
339
    def test_check_members(self):
 
340
        for tarinfo in self.tar:
 
341
            self.assertEqual(int(tarinfo.mtime), 0o7606136617,
 
342
                    "wrong mtime for %s" % tarinfo.name)
 
343
            if not tarinfo.name.startswith("ustar/"):
 
344
                continue
 
345
            self.assertEqual(tarinfo.uname, "tarfile",
 
346
                    "wrong uname for %s" % tarinfo.name)
 
347
 
 
348
    def test_find_members(self):
 
349
        self.assertEqual(self.tar.getmembers()[-1].name, "misc/eof",
 
350
                "could not find all members")
 
351
 
 
352
    @unittest.skipUnless(hasattr(os, "link"),
 
353
                         "Missing hardlink implementation")
 
354
    @support.skip_unless_symlink
 
355
    def test_extract_hardlink(self):
 
356
        # Test hardlink extraction (e.g. bug #857297).
 
357
        with tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") as tar:
 
358
            tar.extract("ustar/regtype", TEMPDIR)
 
359
            self.addCleanup(os.remove, os.path.join(TEMPDIR, "ustar/regtype"))
 
360
 
 
361
            tar.extract("ustar/lnktype", TEMPDIR)
 
362
            self.addCleanup(os.remove, os.path.join(TEMPDIR, "ustar/lnktype"))
 
363
            with open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb") as f:
 
364
                data = f.read()
 
365
            self.assertEqual(md5sum(data), md5_regtype)
 
366
 
 
367
            tar.extract("ustar/symtype", TEMPDIR)
 
368
            self.addCleanup(os.remove, os.path.join(TEMPDIR, "ustar/symtype"))
 
369
            with open(os.path.join(TEMPDIR, "ustar/symtype"), "rb") as f:
 
370
                data = f.read()
 
371
            self.assertEqual(md5sum(data), md5_regtype)
 
372
 
 
373
    def test_extractall(self):
 
374
        # Test if extractall() correctly restores directory permissions
 
375
        # and times (see issue1735).
 
376
        tar = tarfile.open(tarname, encoding="iso8859-1")
 
377
        DIR = os.path.join(TEMPDIR, "extractall")
 
378
        os.mkdir(DIR)
 
379
        try:
 
380
            directories = [t for t in tar if t.isdir()]
 
381
            tar.extractall(DIR, directories)
 
382
            for tarinfo in directories:
 
383
                path = os.path.join(DIR, tarinfo.name)
 
384
                if sys.platform != "win32":
 
385
                    # Win32 has no support for fine grained permissions.
 
386
                    self.assertEqual(tarinfo.mode & 0o777,
 
387
                                     os.stat(path).st_mode & 0o777)
 
388
                def format_mtime(mtime):
 
389
                    if isinstance(mtime, float):
 
390
                        return "{} ({})".format(mtime, mtime.hex())
 
391
                    else:
 
392
                        return "{!r} (int)".format(mtime)
 
393
                file_mtime = os.path.getmtime(path)
 
394
                errmsg = "tar mtime {0} != file time {1} of path {2!a}".format(
 
395
                    format_mtime(tarinfo.mtime),
 
396
                    format_mtime(file_mtime),
 
397
                    path)
 
398
                self.assertEqual(tarinfo.mtime, file_mtime, errmsg)
 
399
        finally:
 
400
            tar.close()
 
401
            shutil.rmtree(DIR)
 
402
 
 
403
    def test_extract_directory(self):
 
404
        dirtype = "ustar/dirtype"
 
405
        DIR = os.path.join(TEMPDIR, "extractdir")
 
406
        os.mkdir(DIR)
 
407
        try:
 
408
            with tarfile.open(tarname, encoding="iso8859-1") as tar:
 
409
                tarinfo = tar.getmember(dirtype)
 
410
                tar.extract(tarinfo, path=DIR)
 
411
                extracted = os.path.join(DIR, dirtype)
 
412
                self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime)
 
413
                if sys.platform != "win32":
 
414
                    self.assertEqual(os.stat(extracted).st_mode & 0o777, 0o755)
 
415
        finally:
 
416
            shutil.rmtree(DIR)
 
417
 
 
418
    def test_init_close_fobj(self):
 
419
        # Issue #7341: Close the internal file object in the TarFile
 
420
        # constructor in case of an error. For the test we rely on
 
421
        # the fact that opening an empty file raises a ReadError.
 
422
        empty = os.path.join(TEMPDIR, "empty")
 
423
        with open(empty, "wb") as fobj:
 
424
            fobj.write(b"")
 
425
 
 
426
        try:
 
427
            tar = object.__new__(tarfile.TarFile)
 
428
            try:
 
429
                tar.__init__(empty)
 
430
            except tarfile.ReadError:
 
431
                self.assertTrue(tar.fileobj.closed)
 
432
            else:
 
433
                self.fail("ReadError not raised")
 
434
        finally:
 
435
            support.unlink(empty)
 
436
 
 
437
    def test_parallel_iteration(self):
 
438
        # Issue #16601: Restarting iteration over tarfile continued
 
439
        # from where it left off.
 
440
        with tarfile.open(self.tarname) as tar:
 
441
            for m1, m2 in zip(tar, tar):
 
442
                self.assertEqual(m1.offset, m2.offset)
 
443
                self.assertEqual(m1.get_info(), m2.get_info())
 
444
 
 
445
class MiscReadTest(MiscReadTestBase, unittest.TestCase):
 
446
    test_fail_comp = None
 
447
 
 
448
class GzipMiscReadTest(GzipTest, MiscReadTestBase, unittest.TestCase):
 
449
    def test_non_existent_targz_file(self):
 
450
        # Test for issue11513: prevent non-existent gzipped tarfiles raising
 
451
        # multiple exceptions.
 
452
        with self.assertRaisesRegex(FileNotFoundError, "xxx"):
 
453
            tarfile.open("xxx", self.mode)
 
454
 
 
455
class Bz2MiscReadTest(Bz2Test, MiscReadTestBase, unittest.TestCase):
 
456
    def test_no_name_argument(self):
 
457
        self.skipTest("BZ2File have no name attribute")
 
458
 
 
459
class LzmaMiscReadTest(LzmaTest, MiscReadTestBase, unittest.TestCase):
 
460
    def test_no_name_argument(self):
 
461
        self.skipTest("LZMAFile have no name attribute")
 
462
 
 
463
 
 
464
class StreamReadTest(CommonReadTest, unittest.TestCase):
 
465
 
 
466
    prefix="r|"
 
467
 
 
468
    def test_read_through(self):
 
469
        # Issue #11224: A poorly designed _FileInFile.read() method
 
470
        # caused seeking errors with stream tar files.
 
471
        for tarinfo in self.tar:
 
472
            if not tarinfo.isreg():
 
473
                continue
 
474
            with self.tar.extractfile(tarinfo) as fobj:
 
475
                while True:
 
476
                    try:
 
477
                        buf = fobj.read(512)
 
478
                    except tarfile.StreamError:
 
479
                        self.fail("simple read-through using "
 
480
                                  "TarFile.extractfile() failed")
 
481
                    if not buf:
 
482
                        break
 
483
 
 
484
    def test_fileobj_regular_file(self):
 
485
        tarinfo = self.tar.next() # get "regtype" (can't use getmember)
 
486
        with self.tar.extractfile(tarinfo) as fobj:
 
487
            data = fobj.read()
 
488
        self.assertEqual(len(data), tarinfo.size,
 
489
                "regular file extraction failed")
 
490
        self.assertEqual(md5sum(data), md5_regtype,
 
491
                "regular file extraction failed")
 
492
 
 
493
    def test_provoke_stream_error(self):
 
494
        tarinfos = self.tar.getmembers()
 
495
        with self.tar.extractfile(tarinfos[0]) as f: # read the first member
 
496
            self.assertRaises(tarfile.StreamError, f.read)
 
497
 
 
498
    def test_compare_members(self):
 
499
        tar1 = tarfile.open(tarname, encoding="iso8859-1")
 
500
        try:
 
501
            tar2 = self.tar
 
502
 
 
503
            while True:
 
504
                t1 = tar1.next()
 
505
                t2 = tar2.next()
 
506
                if t1 is None:
 
507
                    break
 
508
                self.assertIsNotNone(t2, "stream.next() failed.")
 
509
 
 
510
                if t2.islnk() or t2.issym():
 
511
                    with self.assertRaises(tarfile.StreamError):
 
512
                        tar2.extractfile(t2)
 
513
                    continue
 
514
 
 
515
                v1 = tar1.extractfile(t1)
 
516
                v2 = tar2.extractfile(t2)
 
517
                if v1 is None:
 
518
                    continue
 
519
                self.assertIsNotNone(v2, "stream.extractfile() failed")
 
520
                self.assertEqual(v1.read(), v2.read(),
 
521
                        "stream extraction failed")
 
522
        finally:
 
523
            tar1.close()
 
524
 
 
525
class GzipStreamReadTest(GzipTest, StreamReadTest):
 
526
    pass
 
527
 
 
528
class Bz2StreamReadTest(Bz2Test, StreamReadTest):
 
529
    pass
 
530
 
 
531
class LzmaStreamReadTest(LzmaTest, StreamReadTest):
 
532
    pass
 
533
 
 
534
 
 
535
class DetectReadTest(TarTest, unittest.TestCase):
 
536
    def _testfunc_file(self, name, mode):
 
537
        try:
 
538
            tar = tarfile.open(name, mode)
 
539
        except tarfile.ReadError as e:
 
540
            self.fail()
 
541
        else:
 
542
            tar.close()
 
543
 
 
544
    def _testfunc_fileobj(self, name, mode):
 
545
        try:
 
546
            with open(name, "rb") as f:
 
547
                tar = tarfile.open(name, mode, fileobj=f)
 
548
        except tarfile.ReadError as e:
 
549
            self.fail()
 
550
        else:
 
551
            tar.close()
 
552
 
 
553
    def _test_modes(self, testfunc):
 
554
        if self.suffix:
 
555
            with self.assertRaises(tarfile.ReadError):
 
556
                tarfile.open(tarname, mode="r:" + self.suffix)
 
557
            with self.assertRaises(tarfile.ReadError):
 
558
                tarfile.open(tarname, mode="r|" + self.suffix)
 
559
            with self.assertRaises(tarfile.ReadError):
 
560
                tarfile.open(self.tarname, mode="r:")
 
561
            with self.assertRaises(tarfile.ReadError):
 
562
                tarfile.open(self.tarname, mode="r|")
 
563
        testfunc(self.tarname, "r")
 
564
        testfunc(self.tarname, "r:" + self.suffix)
 
565
        testfunc(self.tarname, "r:*")
 
566
        testfunc(self.tarname, "r|" + self.suffix)
 
567
        testfunc(self.tarname, "r|*")
 
568
 
 
569
    def test_detect_file(self):
 
570
        self._test_modes(self._testfunc_file)
 
571
 
 
572
    def test_detect_fileobj(self):
 
573
        self._test_modes(self._testfunc_fileobj)
 
574
 
 
575
class GzipDetectReadTest(GzipTest, DetectReadTest):
 
576
    pass
 
577
 
 
578
class Bz2DetectReadTest(Bz2Test, DetectReadTest):
 
579
    def test_detect_stream_bz2(self):
 
580
        # Originally, tarfile's stream detection looked for the string
 
581
        # "BZh91" at the start of the file. This is incorrect because
 
582
        # the '9' represents the blocksize (900kB). If the file was
 
583
        # compressed using another blocksize autodetection fails.
 
584
        with open(tarname, "rb") as fobj:
 
585
            data = fobj.read()
 
586
 
 
587
        # Compress with blocksize 100kB, the file starts with "BZh11".
 
588
        with bz2.BZ2File(tmpname, "wb", compresslevel=1) as fobj:
 
589
            fobj.write(data)
 
590
 
 
591
        self._testfunc_file(tmpname, "r|*")
 
592
 
 
593
class LzmaDetectReadTest(LzmaTest, DetectReadTest):
 
594
    pass
 
595
 
 
596
 
 
597
class MemberReadTest(ReadTest, unittest.TestCase):
 
598
 
 
599
    def _test_member(self, tarinfo, chksum=None, **kwargs):
 
600
        if chksum is not None:
 
601
            with self.tar.extractfile(tarinfo) as f:
 
602
                self.assertEqual(md5sum(f.read()), chksum,
 
603
                        "wrong md5sum for %s" % tarinfo.name)
 
604
 
 
605
        kwargs["mtime"] = 0o7606136617
 
606
        kwargs["uid"] = 1000
 
607
        kwargs["gid"] = 100
 
608
        if "old-v7" not in tarinfo.name:
 
609
            # V7 tar can't handle alphabetic owners.
 
610
            kwargs["uname"] = "tarfile"
 
611
            kwargs["gname"] = "tarfile"
 
612
        for k, v in kwargs.items():
 
613
            self.assertEqual(getattr(tarinfo, k), v,
 
614
                    "wrong value in %s field of %s" % (k, tarinfo.name))
 
615
 
 
616
    def test_find_regtype(self):
 
617
        tarinfo = self.tar.getmember("ustar/regtype")
 
618
        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
 
619
 
 
620
    def test_find_conttype(self):
 
621
        tarinfo = self.tar.getmember("ustar/conttype")
 
622
        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
 
623
 
 
624
    def test_find_dirtype(self):
 
625
        tarinfo = self.tar.getmember("ustar/dirtype")
 
626
        self._test_member(tarinfo, size=0)
 
627
 
 
628
    def test_find_dirtype_with_size(self):
 
629
        tarinfo = self.tar.getmember("ustar/dirtype-with-size")
 
630
        self._test_member(tarinfo, size=255)
 
631
 
 
632
    def test_find_lnktype(self):
 
633
        tarinfo = self.tar.getmember("ustar/lnktype")
 
634
        self._test_member(tarinfo, size=0, linkname="ustar/regtype")
 
635
 
 
636
    def test_find_symtype(self):
 
637
        tarinfo = self.tar.getmember("ustar/symtype")
 
638
        self._test_member(tarinfo, size=0, linkname="regtype")
 
639
 
 
640
    def test_find_blktype(self):
 
641
        tarinfo = self.tar.getmember("ustar/blktype")
 
642
        self._test_member(tarinfo, size=0, devmajor=3, devminor=0)
 
643
 
 
644
    def test_find_chrtype(self):
 
645
        tarinfo = self.tar.getmember("ustar/chrtype")
 
646
        self._test_member(tarinfo, size=0, devmajor=1, devminor=3)
 
647
 
 
648
    def test_find_fifotype(self):
 
649
        tarinfo = self.tar.getmember("ustar/fifotype")
 
650
        self._test_member(tarinfo, size=0)
 
651
 
 
652
    def test_find_sparse(self):
 
653
        tarinfo = self.tar.getmember("ustar/sparse")
 
654
        self._test_member(tarinfo, size=86016, chksum=md5_sparse)
 
655
 
 
656
    def test_find_gnusparse(self):
 
657
        tarinfo = self.tar.getmember("gnu/sparse")
 
658
        self._test_member(tarinfo, size=86016, chksum=md5_sparse)
 
659
 
 
660
    def test_find_gnusparse_00(self):
 
661
        tarinfo = self.tar.getmember("gnu/sparse-0.0")
 
662
        self._test_member(tarinfo, size=86016, chksum=md5_sparse)
 
663
 
 
664
    def test_find_gnusparse_01(self):
 
665
        tarinfo = self.tar.getmember("gnu/sparse-0.1")
 
666
        self._test_member(tarinfo, size=86016, chksum=md5_sparse)
 
667
 
 
668
    def test_find_gnusparse_10(self):
 
669
        tarinfo = self.tar.getmember("gnu/sparse-1.0")
 
670
        self._test_member(tarinfo, size=86016, chksum=md5_sparse)
 
671
 
 
672
    def test_find_umlauts(self):
 
673
        tarinfo = self.tar.getmember("ustar/umlauts-"
 
674
                                     "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
 
675
        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
 
676
 
 
677
    def test_find_ustar_longname(self):
 
678
        name = "ustar/" + "12345/" * 39 + "1234567/longname"
 
679
        self.assertIn(name, self.tar.getnames())
 
680
 
 
681
    def test_find_regtype_oldv7(self):
 
682
        tarinfo = self.tar.getmember("misc/regtype-old-v7")
 
683
        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
 
684
 
 
685
    def test_find_pax_umlauts(self):
 
686
        self.tar.close()
 
687
        self.tar = tarfile.open(self.tarname, mode=self.mode,
 
688
                                encoding="iso8859-1")
 
689
        tarinfo = self.tar.getmember("pax/umlauts-"
 
690
                                     "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
 
691
        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
 
692
 
 
693
 
 
694
class LongnameTest:
 
695
 
 
696
    def test_read_longname(self):
 
697
        # Test reading of longname (bug #1471427).
 
698
        longname = self.subdir + "/" + "123/" * 125 + "longname"
 
699
        try:
 
700
            tarinfo = self.tar.getmember(longname)
 
701
        except KeyError:
 
702
            self.fail("longname not found")
 
703
        self.assertNotEqual(tarinfo.type, tarfile.DIRTYPE,
 
704
                "read longname as dirtype")
 
705
 
 
706
    def test_read_longlink(self):
 
707
        longname = self.subdir + "/" + "123/" * 125 + "longname"
 
708
        longlink = self.subdir + "/" + "123/" * 125 + "longlink"
 
709
        try:
 
710
            tarinfo = self.tar.getmember(longlink)
 
711
        except KeyError:
 
712
            self.fail("longlink not found")
 
713
        self.assertEqual(tarinfo.linkname, longname, "linkname wrong")
 
714
 
 
715
    def test_truncated_longname(self):
 
716
        longname = self.subdir + "/" + "123/" * 125 + "longname"
 
717
        tarinfo = self.tar.getmember(longname)
 
718
        offset = tarinfo.offset
 
719
        self.tar.fileobj.seek(offset)
 
720
        fobj = io.BytesIO(self.tar.fileobj.read(3 * 512))
 
721
        with self.assertRaises(tarfile.ReadError):
 
722
            tarfile.open(name="foo.tar", fileobj=fobj)
 
723
 
 
724
    def test_header_offset(self):
 
725
        # Test if the start offset of the TarInfo object includes
 
726
        # the preceding extended header.
 
727
        longname = self.subdir + "/" + "123/" * 125 + "longname"
 
728
        offset = self.tar.getmember(longname).offset
 
729
        with open(tarname, "rb") as fobj:
 
730
            fobj.seek(offset)
 
731
            tarinfo = tarfile.TarInfo.frombuf(fobj.read(512),
 
732
                                              "iso8859-1", "strict")
 
733
            self.assertEqual(tarinfo.type, self.longnametype)
 
734
 
 
735
 
 
736
class GNUReadTest(LongnameTest, ReadTest, unittest.TestCase):
 
737
 
 
738
    subdir = "gnu"
 
739
    longnametype = tarfile.GNUTYPE_LONGNAME
 
740
 
 
741
    # Since 3.2 tarfile is supposed to accurately restore sparse members and
 
742
    # produce files with holes. This is what we actually want to test here.
 
743
    # Unfortunately, not all platforms/filesystems support sparse files, and
 
744
    # even on platforms that do it is non-trivial to make reliable assertions
 
745
    # about holes in files. Therefore, we first do one basic test which works
 
746
    # an all platforms, and after that a test that will work only on
 
747
    # platforms/filesystems that prove to support sparse files.
 
748
    def _test_sparse_file(self, name):
 
749
        self.tar.extract(name, TEMPDIR)
 
750
        filename = os.path.join(TEMPDIR, name)
 
751
        with open(filename, "rb") as fobj:
 
752
            data = fobj.read()
 
753
        self.assertEqual(md5sum(data), md5_sparse,
 
754
                "wrong md5sum for %s" % name)
 
755
 
 
756
        if self._fs_supports_holes():
 
757
            s = os.stat(filename)
 
758
            self.assertLess(s.st_blocks * 512, s.st_size)
 
759
 
 
760
    def test_sparse_file_old(self):
 
761
        self._test_sparse_file("gnu/sparse")
 
762
 
 
763
    def test_sparse_file_00(self):
 
764
        self._test_sparse_file("gnu/sparse-0.0")
 
765
 
 
766
    def test_sparse_file_01(self):
 
767
        self._test_sparse_file("gnu/sparse-0.1")
 
768
 
 
769
    def test_sparse_file_10(self):
 
770
        self._test_sparse_file("gnu/sparse-1.0")
 
771
 
 
772
    @staticmethod
 
773
    def _fs_supports_holes():
 
774
        # Return True if the platform knows the st_blocks stat attribute and
 
775
        # uses st_blocks units of 512 bytes, and if the filesystem is able to
 
776
        # store holes in files.
 
777
        if sys.platform.startswith("linux"):
 
778
            # Linux evidentially has 512 byte st_blocks units.
 
779
            name = os.path.join(TEMPDIR, "sparse-test")
 
780
            with open(name, "wb") as fobj:
 
781
                fobj.seek(4096)
 
782
                fobj.truncate()
 
783
            s = os.stat(name)
 
784
            os.remove(name)
 
785
            return s.st_blocks == 0
 
786
        else:
 
787
            return False
 
788
 
 
789
 
 
790
class PaxReadTest(LongnameTest, ReadTest, unittest.TestCase):
 
791
 
 
792
    subdir = "pax"
 
793
    longnametype = tarfile.XHDTYPE
 
794
 
 
795
    def test_pax_global_headers(self):
 
796
        tar = tarfile.open(tarname, encoding="iso8859-1")
 
797
        try:
 
798
            tarinfo = tar.getmember("pax/regtype1")
 
799
            self.assertEqual(tarinfo.uname, "foo")
 
800
            self.assertEqual(tarinfo.gname, "bar")
 
801
            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
 
802
                             "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
 
803
 
 
804
            tarinfo = tar.getmember("pax/regtype2")
 
805
            self.assertEqual(tarinfo.uname, "")
 
806
            self.assertEqual(tarinfo.gname, "bar")
 
807
            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
 
808
                             "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
 
809
 
 
810
            tarinfo = tar.getmember("pax/regtype3")
 
811
            self.assertEqual(tarinfo.uname, "tarfile")
 
812
            self.assertEqual(tarinfo.gname, "tarfile")
 
813
            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
 
814
                             "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
 
815
        finally:
 
816
            tar.close()
 
817
 
 
818
    def test_pax_number_fields(self):
 
819
        # All following number fields are read from the pax header.
 
820
        tar = tarfile.open(tarname, encoding="iso8859-1")
 
821
        try:
 
822
            tarinfo = tar.getmember("pax/regtype4")
 
823
            self.assertEqual(tarinfo.size, 7011)
 
824
            self.assertEqual(tarinfo.uid, 123)
 
825
            self.assertEqual(tarinfo.gid, 123)
 
826
            self.assertEqual(tarinfo.mtime, 1041808783.0)
 
827
            self.assertEqual(type(tarinfo.mtime), float)
 
828
            self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0)
 
829
            self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0)
 
830
        finally:
 
831
            tar.close()
 
832
 
 
833
 
 
834
class WriteTestBase(TarTest):
 
835
    # Put all write tests in here that are supposed to be tested
 
836
    # in all possible mode combinations.
 
837
 
 
838
    def test_fileobj_no_close(self):
 
839
        fobj = io.BytesIO()
 
840
        tar = tarfile.open(fileobj=fobj, mode=self.mode)
 
841
        tar.addfile(tarfile.TarInfo("foo"))
 
842
        tar.close()
 
843
        self.assertFalse(fobj.closed, "external fileobjs must never closed")
 
844
 
 
845
 
 
846
class WriteTest(WriteTestBase, unittest.TestCase):
 
847
 
 
848
    prefix = "w:"
 
849
 
 
850
    def test_100_char_name(self):
 
851
        # The name field in a tar header stores strings of at most 100 chars.
 
852
        # If a string is shorter than 100 chars it has to be padded with '\0',
 
853
        # which implies that a string of exactly 100 chars is stored without
 
854
        # a trailing '\0'.
 
855
        name = "0123456789" * 10
 
856
        tar = tarfile.open(tmpname, self.mode)
 
857
        try:
 
858
            t = tarfile.TarInfo(name)
 
859
            tar.addfile(t)
 
860
        finally:
 
861
            tar.close()
 
862
 
 
863
        tar = tarfile.open(tmpname)
 
864
        try:
 
865
            self.assertEqual(tar.getnames()[0], name,
 
866
                    "failed to store 100 char filename")
 
867
        finally:
 
868
            tar.close()
 
869
 
 
870
    def test_tar_size(self):
 
871
        # Test for bug #1013882.
 
872
        tar = tarfile.open(tmpname, self.mode)
 
873
        try:
 
874
            path = os.path.join(TEMPDIR, "file")
 
875
            with open(path, "wb") as fobj:
 
876
                fobj.write(b"aaa")
 
877
            tar.add(path)
 
878
        finally:
 
879
            tar.close()
 
880
        self.assertGreater(os.path.getsize(tmpname), 0,
 
881
                "tarfile is empty")
 
882
 
 
883
    # The test_*_size tests test for bug #1167128.
 
884
    def test_file_size(self):
 
885
        tar = tarfile.open(tmpname, self.mode)
 
886
        try:
 
887
            path = os.path.join(TEMPDIR, "file")
 
888
            with open(path, "wb"):
 
889
                pass
 
890
            tarinfo = tar.gettarinfo(path)
 
891
            self.assertEqual(tarinfo.size, 0)
 
892
 
 
893
            with open(path, "wb") as fobj:
 
894
                fobj.write(b"aaa")
 
895
            tarinfo = tar.gettarinfo(path)
 
896
            self.assertEqual(tarinfo.size, 3)
 
897
        finally:
 
898
            tar.close()
 
899
 
 
900
    def test_directory_size(self):
 
901
        path = os.path.join(TEMPDIR, "directory")
 
902
        os.mkdir(path)
 
903
        try:
 
904
            tar = tarfile.open(tmpname, self.mode)
 
905
            try:
 
906
                tarinfo = tar.gettarinfo(path)
 
907
                self.assertEqual(tarinfo.size, 0)
 
908
            finally:
 
909
                tar.close()
 
910
        finally:
 
911
            os.rmdir(path)
 
912
 
 
913
    @unittest.skipUnless(hasattr(os, "link"),
 
914
                         "Missing hardlink implementation")
 
915
    def test_link_size(self):
 
916
        link = os.path.join(TEMPDIR, "link")
 
917
        target = os.path.join(TEMPDIR, "link_target")
 
918
        with open(target, "wb") as fobj:
 
919
            fobj.write(b"aaa")
 
920
        os.link(target, link)
 
921
        try:
 
922
            tar = tarfile.open(tmpname, self.mode)
 
923
            try:
 
924
                # Record the link target in the inodes list.
 
925
                tar.gettarinfo(target)
 
926
                tarinfo = tar.gettarinfo(link)
 
927
                self.assertEqual(tarinfo.size, 0)
 
928
            finally:
 
929
                tar.close()
 
930
        finally:
 
931
            os.remove(target)
 
932
            os.remove(link)
 
933
 
 
934
    @support.skip_unless_symlink
 
935
    def test_symlink_size(self):
 
936
        path = os.path.join(TEMPDIR, "symlink")
 
937
        os.symlink("link_target", path)
 
938
        try:
 
939
            tar = tarfile.open(tmpname, self.mode)
 
940
            try:
 
941
                tarinfo = tar.gettarinfo(path)
 
942
                self.assertEqual(tarinfo.size, 0)
 
943
            finally:
 
944
                tar.close()
 
945
        finally:
 
946
            os.remove(path)
 
947
 
 
948
    def test_add_self(self):
 
949
        # Test for #1257255.
 
950
        dstname = os.path.abspath(tmpname)
 
951
        tar = tarfile.open(tmpname, self.mode)
 
952
        try:
 
953
            self.assertEqual(tar.name, dstname,
 
954
                    "archive name must be absolute")
 
955
            tar.add(dstname)
 
956
            self.assertEqual(tar.getnames(), [],
 
957
                    "added the archive to itself")
 
958
 
 
959
            cwd = os.getcwd()
 
960
            os.chdir(TEMPDIR)
 
961
            tar.add(dstname)
 
962
            os.chdir(cwd)
 
963
            self.assertEqual(tar.getnames(), [],
 
964
                    "added the archive to itself")
 
965
        finally:
 
966
            tar.close()
 
967
 
 
968
    def test_exclude(self):
 
969
        tempdir = os.path.join(TEMPDIR, "exclude")
 
970
        os.mkdir(tempdir)
 
971
        try:
 
972
            for name in ("foo", "bar", "baz"):
 
973
                name = os.path.join(tempdir, name)
 
974
                support.create_empty_file(name)
 
975
 
 
976
            exclude = os.path.isfile
 
977
 
 
978
            tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1")
 
979
            try:
 
980
                with support.check_warnings(("use the filter argument",
 
981
                                             DeprecationWarning)):
 
982
                    tar.add(tempdir, arcname="empty_dir", exclude=exclude)
 
983
            finally:
 
984
                tar.close()
 
985
 
 
986
            tar = tarfile.open(tmpname, "r")
 
987
            try:
 
988
                self.assertEqual(len(tar.getmembers()), 1)
 
989
                self.assertEqual(tar.getnames()[0], "empty_dir")
 
990
            finally:
 
991
                tar.close()
 
992
        finally:
 
993
            shutil.rmtree(tempdir)
 
994
 
 
995
    def test_filter(self):
 
996
        tempdir = os.path.join(TEMPDIR, "filter")
 
997
        os.mkdir(tempdir)
 
998
        try:
 
999
            for name in ("foo", "bar", "baz"):
 
1000
                name = os.path.join(tempdir, name)
 
1001
                support.create_empty_file(name)
 
1002
 
 
1003
            def filter(tarinfo):
 
1004
                if os.path.basename(tarinfo.name) == "bar":
 
1005
                    return
 
1006
                tarinfo.uid = 123
 
1007
                tarinfo.uname = "foo"
 
1008
                return tarinfo
 
1009
 
 
1010
            tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1")
 
1011
            try:
 
1012
                tar.add(tempdir, arcname="empty_dir", filter=filter)
 
1013
            finally:
 
1014
                tar.close()
 
1015
 
 
1016
            # Verify that filter is a keyword-only argument
 
1017
            with self.assertRaises(TypeError):
 
1018
                tar.add(tempdir, "empty_dir", True, None, filter)
 
1019
 
 
1020
            tar = tarfile.open(tmpname, "r")
 
1021
            try:
 
1022
                for tarinfo in tar:
 
1023
                    self.assertEqual(tarinfo.uid, 123)
 
1024
                    self.assertEqual(tarinfo.uname, "foo")
 
1025
                self.assertEqual(len(tar.getmembers()), 3)
 
1026
            finally:
 
1027
                tar.close()
 
1028
        finally:
 
1029
            shutil.rmtree(tempdir)
 
1030
 
 
1031
    # Guarantee that stored pathnames are not modified. Don't
 
1032
    # remove ./ or ../ or double slashes. Still make absolute
 
1033
    # pathnames relative.
 
1034
    # For details see bug #6054.
 
1035
    def _test_pathname(self, path, cmp_path=None, dir=False):
 
1036
        # Create a tarfile with an empty member named path
 
1037
        # and compare the stored name with the original.
 
1038
        foo = os.path.join(TEMPDIR, "foo")
 
1039
        if not dir:
 
1040
            support.create_empty_file(foo)
 
1041
        else:
 
1042
            os.mkdir(foo)
 
1043
 
 
1044
        tar = tarfile.open(tmpname, self.mode)
 
1045
        try:
 
1046
            tar.add(foo, arcname=path)
 
1047
        finally:
 
1048
            tar.close()
 
1049
 
 
1050
        tar = tarfile.open(tmpname, "r")
 
1051
        try:
 
1052
            t = tar.next()
 
1053
        finally:
 
1054
            tar.close()
 
1055
 
 
1056
        if not dir:
 
1057
            os.remove(foo)
 
1058
        else:
 
1059
            os.rmdir(foo)
 
1060
 
 
1061
        self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/"))
 
1062
 
 
1063
 
 
1064
    @support.skip_unless_symlink
 
1065
    def test_extractall_symlinks(self):
 
1066
        # Test if extractall works properly when tarfile contains symlinks
 
1067
        tempdir = os.path.join(TEMPDIR, "testsymlinks")
 
1068
        temparchive = os.path.join(TEMPDIR, "testsymlinks.tar")
 
1069
        os.mkdir(tempdir)
 
1070
        try:
 
1071
            source_file = os.path.join(tempdir,'source')
 
1072
            target_file = os.path.join(tempdir,'symlink')
 
1073
            with open(source_file,'w') as f:
 
1074
                f.write('something\n')
 
1075
            os.symlink(source_file, target_file)
 
1076
            tar = tarfile.open(temparchive,'w')
 
1077
            tar.add(source_file)
 
1078
            tar.add(target_file)
 
1079
            tar.close()
 
1080
            # Let's extract it to the location which contains the symlink
 
1081
            tar = tarfile.open(temparchive,'r')
 
1082
            # this should not raise OSError: [Errno 17] File exists
 
1083
            try:
 
1084
                tar.extractall(path=tempdir)
 
1085
            except OSError:
 
1086
                self.fail("extractall failed with symlinked files")
 
1087
            finally:
 
1088
                tar.close()
 
1089
        finally:
 
1090
            os.unlink(temparchive)
 
1091
            shutil.rmtree(tempdir)
 
1092
 
 
1093
    def test_pathnames(self):
 
1094
        self._test_pathname("foo")
 
1095
        self._test_pathname(os.path.join("foo", ".", "bar"))
 
1096
        self._test_pathname(os.path.join("foo", "..", "bar"))
 
1097
        self._test_pathname(os.path.join(".", "foo"))
 
1098
        self._test_pathname(os.path.join(".", "foo", "."))
 
1099
        self._test_pathname(os.path.join(".", "foo", ".", "bar"))
 
1100
        self._test_pathname(os.path.join(".", "foo", "..", "bar"))
 
1101
        self._test_pathname(os.path.join(".", "foo", "..", "bar"))
 
1102
        self._test_pathname(os.path.join("..", "foo"))
 
1103
        self._test_pathname(os.path.join("..", "foo", ".."))
 
1104
        self._test_pathname(os.path.join("..", "foo", ".", "bar"))
 
1105
        self._test_pathname(os.path.join("..", "foo", "..", "bar"))
 
1106
 
 
1107
        self._test_pathname("foo" + os.sep + os.sep + "bar")
 
1108
        self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True)
 
1109
 
 
1110
    def test_abs_pathnames(self):
 
1111
        if sys.platform == "win32":
 
1112
            self._test_pathname("C:\\foo", "foo")
 
1113
        else:
 
1114
            self._test_pathname("/foo", "foo")
 
1115
            self._test_pathname("///foo", "foo")
 
1116
 
 
1117
    def test_cwd(self):
 
1118
        # Test adding the current working directory.
 
1119
        cwd = os.getcwd()
 
1120
        os.chdir(TEMPDIR)
 
1121
        try:
 
1122
            tar = tarfile.open(tmpname, self.mode)
 
1123
            try:
 
1124
                tar.add(".")
 
1125
            finally:
 
1126
                tar.close()
 
1127
 
 
1128
            tar = tarfile.open(tmpname, "r")
 
1129
            try:
 
1130
                for t in tar:
 
1131
                    if t.name != ".":
 
1132
                        self.assertTrue(t.name.startswith("./"), t.name)
 
1133
            finally:
 
1134
                tar.close()
 
1135
        finally:
 
1136
            os.chdir(cwd)
 
1137
 
 
1138
class GzipWriteTest(GzipTest, WriteTest):
 
1139
    pass
 
1140
 
 
1141
class Bz2WriteTest(Bz2Test, WriteTest):
 
1142
    pass
 
1143
 
 
1144
class LzmaWriteTest(LzmaTest, WriteTest):
 
1145
    pass
 
1146
 
 
1147
 
 
1148
class StreamWriteTest(WriteTestBase, unittest.TestCase):
 
1149
 
 
1150
    prefix = "w|"
 
1151
    decompressor = None
 
1152
 
 
1153
    def test_stream_padding(self):
 
1154
        # Test for bug #1543303.
 
1155
        tar = tarfile.open(tmpname, self.mode)
 
1156
        tar.close()
 
1157
        if self.decompressor:
 
1158
            dec = self.decompressor()
 
1159
            with open(tmpname, "rb") as fobj:
 
1160
                data = fobj.read()
 
1161
            data = dec.decompress(data)
 
1162
            self.assertFalse(dec.unused_data, "found trailing data")
 
1163
        else:
 
1164
            with self.open(tmpname) as fobj:
 
1165
                data = fobj.read()
 
1166
        self.assertEqual(data.count(b"\0"), tarfile.RECORDSIZE,
 
1167
                        "incorrect zero padding")
 
1168
 
 
1169
    @unittest.skipUnless(sys.platform != "win32" and hasattr(os, "umask"),
 
1170
                         "Missing umask implementation")
 
1171
    def test_file_mode(self):
 
1172
        # Test for issue #8464: Create files with correct
 
1173
        # permissions.
 
1174
        if os.path.exists(tmpname):
 
1175
            os.remove(tmpname)
 
1176
 
 
1177
        original_umask = os.umask(0o022)
 
1178
        try:
 
1179
            tar = tarfile.open(tmpname, self.mode)
 
1180
            tar.close()
 
1181
            mode = os.stat(tmpname).st_mode & 0o777
 
1182
            self.assertEqual(mode, 0o644, "wrong file permissions")
 
1183
        finally:
 
1184
            os.umask(original_umask)
 
1185
 
 
1186
class GzipStreamWriteTest(GzipTest, StreamWriteTest):
 
1187
    pass
 
1188
 
 
1189
class Bz2StreamWriteTest(Bz2Test, StreamWriteTest):
 
1190
    decompressor = bz2.BZ2Decompressor if bz2 else None
 
1191
 
 
1192
class LzmaStreamWriteTest(LzmaTest, StreamWriteTest):
 
1193
    decompressor = lzma.LZMADecompressor if lzma else None
 
1194
 
 
1195
 
 
1196
class GNUWriteTest(unittest.TestCase):
 
1197
    # This testcase checks for correct creation of GNU Longname
 
1198
    # and Longlink extended headers (cp. bug #812325).
 
1199
 
 
1200
    def _length(self, s):
 
1201
        blocks = len(s) // 512 + 1
 
1202
        return blocks * 512
 
1203
 
 
1204
    def _calc_size(self, name, link=None):
 
1205
        # Initial tar header
 
1206
        count = 512
 
1207
 
 
1208
        if len(name) > tarfile.LENGTH_NAME:
 
1209
            # GNU longname extended header + longname
 
1210
            count += 512
 
1211
            count += self._length(name)
 
1212
        if link is not None and len(link) > tarfile.LENGTH_LINK:
 
1213
            # GNU longlink extended header + longlink
 
1214
            count += 512
 
1215
            count += self._length(link)
 
1216
        return count
 
1217
 
 
1218
    def _test(self, name, link=None):
 
1219
        tarinfo = tarfile.TarInfo(name)
 
1220
        if link:
 
1221
            tarinfo.linkname = link
 
1222
            tarinfo.type = tarfile.LNKTYPE
 
1223
 
 
1224
        tar = tarfile.open(tmpname, "w")
 
1225
        try:
 
1226
            tar.format = tarfile.GNU_FORMAT
 
1227
            tar.addfile(tarinfo)
 
1228
 
 
1229
            v1 = self._calc_size(name, link)
 
1230
            v2 = tar.offset
 
1231
            self.assertEqual(v1, v2, "GNU longname/longlink creation failed")
 
1232
        finally:
 
1233
            tar.close()
 
1234
 
 
1235
        tar = tarfile.open(tmpname)
 
1236
        try:
 
1237
            member = tar.next()
 
1238
            self.assertIsNotNone(member,
 
1239
                    "unable to read longname member")
 
1240
            self.assertEqual(tarinfo.name, member.name,
 
1241
                    "unable to read longname member")
 
1242
            self.assertEqual(tarinfo.linkname, member.linkname,
 
1243
                    "unable to read longname member")
 
1244
        finally:
 
1245
            tar.close()
 
1246
 
 
1247
    def test_longname_1023(self):
 
1248
        self._test(("longnam/" * 127) + "longnam")
 
1249
 
 
1250
    def test_longname_1024(self):
 
1251
        self._test(("longnam/" * 127) + "longname")
 
1252
 
 
1253
    def test_longname_1025(self):
 
1254
        self._test(("longnam/" * 127) + "longname_")
 
1255
 
 
1256
    def test_longlink_1023(self):
 
1257
        self._test("name", ("longlnk/" * 127) + "longlnk")
 
1258
 
 
1259
    def test_longlink_1024(self):
 
1260
        self._test("name", ("longlnk/" * 127) + "longlink")
 
1261
 
 
1262
    def test_longlink_1025(self):
 
1263
        self._test("name", ("longlnk/" * 127) + "longlink_")
 
1264
 
 
1265
    def test_longnamelink_1023(self):
 
1266
        self._test(("longnam/" * 127) + "longnam",
 
1267
                   ("longlnk/" * 127) + "longlnk")
 
1268
 
 
1269
    def test_longnamelink_1024(self):
 
1270
        self._test(("longnam/" * 127) + "longname",
 
1271
                   ("longlnk/" * 127) + "longlink")
 
1272
 
 
1273
    def test_longnamelink_1025(self):
 
1274
        self._test(("longnam/" * 127) + "longname_",
 
1275
                   ("longlnk/" * 127) + "longlink_")
 
1276
 
 
1277
 
 
1278
@unittest.skipUnless(hasattr(os, "link"), "Missing hardlink implementation")
 
1279
class HardlinkTest(unittest.TestCase):
 
1280
    # Test the creation of LNKTYPE (hardlink) members in an archive.
 
1281
 
 
1282
    def setUp(self):
 
1283
        self.foo = os.path.join(TEMPDIR, "foo")
 
1284
        self.bar = os.path.join(TEMPDIR, "bar")
 
1285
 
 
1286
        with open(self.foo, "wb") as fobj:
 
1287
            fobj.write(b"foo")
 
1288
 
 
1289
        os.link(self.foo, self.bar)
 
1290
 
 
1291
        self.tar = tarfile.open(tmpname, "w")
 
1292
        self.tar.add(self.foo)
 
1293
 
 
1294
    def tearDown(self):
 
1295
        self.tar.close()
 
1296
        support.unlink(self.foo)
 
1297
        support.unlink(self.bar)
 
1298
 
 
1299
    def test_add_twice(self):
 
1300
        # The same name will be added as a REGTYPE every
 
1301
        # time regardless of st_nlink.
 
1302
        tarinfo = self.tar.gettarinfo(self.foo)
 
1303
        self.assertEqual(tarinfo.type, tarfile.REGTYPE,
 
1304
                "add file as regular failed")
 
1305
 
 
1306
    def test_add_hardlink(self):
 
1307
        tarinfo = self.tar.gettarinfo(self.bar)
 
1308
        self.assertEqual(tarinfo.type, tarfile.LNKTYPE,
 
1309
                "add file as hardlink failed")
 
1310
 
 
1311
    def test_dereference_hardlink(self):
 
1312
        self.tar.dereference = True
 
1313
        tarinfo = self.tar.gettarinfo(self.bar)
 
1314
        self.assertEqual(tarinfo.type, tarfile.REGTYPE,
 
1315
                "dereferencing hardlink failed")
 
1316
 
 
1317
 
 
1318
class PaxWriteTest(GNUWriteTest):
 
1319
 
 
1320
    def _test(self, name, link=None):
 
1321
        # See GNUWriteTest.
 
1322
        tarinfo = tarfile.TarInfo(name)
 
1323
        if link:
 
1324
            tarinfo.linkname = link
 
1325
            tarinfo.type = tarfile.LNKTYPE
 
1326
 
 
1327
        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT)
 
1328
        try:
 
1329
            tar.addfile(tarinfo)
 
1330
        finally:
 
1331
            tar.close()
 
1332
 
 
1333
        tar = tarfile.open(tmpname)
 
1334
        try:
 
1335
            if link:
 
1336
                l = tar.getmembers()[0].linkname
 
1337
                self.assertEqual(link, l, "PAX longlink creation failed")
 
1338
            else:
 
1339
                n = tar.getmembers()[0].name
 
1340
                self.assertEqual(name, n, "PAX longname creation failed")
 
1341
        finally:
 
1342
            tar.close()
 
1343
 
 
1344
    def test_pax_global_header(self):
 
1345
        pax_headers = {
 
1346
                "foo": "bar",
 
1347
                "uid": "0",
 
1348
                "mtime": "1.23",
 
1349
                "test": "\xe4\xf6\xfc",
 
1350
                "\xe4\xf6\xfc": "test"}
 
1351
 
 
1352
        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
 
1353
                pax_headers=pax_headers)
 
1354
        try:
 
1355
            tar.addfile(tarfile.TarInfo("test"))
 
1356
        finally:
 
1357
            tar.close()
 
1358
 
 
1359
        # Test if the global header was written correctly.
 
1360
        tar = tarfile.open(tmpname, encoding="iso8859-1")
 
1361
        try:
 
1362
            self.assertEqual(tar.pax_headers, pax_headers)
 
1363
            self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers)
 
1364
            # Test if all the fields are strings.
 
1365
            for key, val in tar.pax_headers.items():
 
1366
                self.assertIsNot(type(key), bytes)
 
1367
                self.assertIsNot(type(val), bytes)
 
1368
                if key in tarfile.PAX_NUMBER_FIELDS:
 
1369
                    try:
 
1370
                        tarfile.PAX_NUMBER_FIELDS[key](val)
 
1371
                    except (TypeError, ValueError):
 
1372
                        self.fail("unable to convert pax header field")
 
1373
        finally:
 
1374
            tar.close()
 
1375
 
 
1376
    def test_pax_extended_header(self):
 
1377
        # The fields from the pax header have priority over the
 
1378
        # TarInfo.
 
1379
        pax_headers = {"path": "foo", "uid": "123"}
 
1380
 
 
1381
        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
 
1382
                           encoding="iso8859-1")
 
1383
        try:
 
1384
            t = tarfile.TarInfo()
 
1385
            t.name = "\xe4\xf6\xfc" # non-ASCII
 
1386
            t.uid = 8**8 # too large
 
1387
            t.pax_headers = pax_headers
 
1388
            tar.addfile(t)
 
1389
        finally:
 
1390
            tar.close()
 
1391
 
 
1392
        tar = tarfile.open(tmpname, encoding="iso8859-1")
 
1393
        try:
 
1394
            t = tar.getmembers()[0]
 
1395
            self.assertEqual(t.pax_headers, pax_headers)
 
1396
            self.assertEqual(t.name, "foo")
 
1397
            self.assertEqual(t.uid, 123)
 
1398
        finally:
 
1399
            tar.close()
 
1400
 
 
1401
 
 
1402
class UstarUnicodeTest(unittest.TestCase):
 
1403
 
 
1404
    format = tarfile.USTAR_FORMAT
 
1405
 
 
1406
    def test_iso8859_1_filename(self):
 
1407
        self._test_unicode_filename("iso8859-1")
 
1408
 
 
1409
    def test_utf7_filename(self):
 
1410
        self._test_unicode_filename("utf7")
 
1411
 
 
1412
    def test_utf8_filename(self):
 
1413
        self._test_unicode_filename("utf-8")
 
1414
 
 
1415
    def _test_unicode_filename(self, encoding):
 
1416
        tar = tarfile.open(tmpname, "w", format=self.format,
 
1417
                           encoding=encoding, errors="strict")
 
1418
        try:
 
1419
            name = "\xe4\xf6\xfc"
 
1420
            tar.addfile(tarfile.TarInfo(name))
 
1421
        finally:
 
1422
            tar.close()
 
1423
 
 
1424
        tar = tarfile.open(tmpname, encoding=encoding)
 
1425
        try:
 
1426
            self.assertEqual(tar.getmembers()[0].name, name)
 
1427
        finally:
 
1428
            tar.close()
 
1429
 
 
1430
    def test_unicode_filename_error(self):
 
1431
        tar = tarfile.open(tmpname, "w", format=self.format,
 
1432
                           encoding="ascii", errors="strict")
 
1433
        try:
 
1434
            tarinfo = tarfile.TarInfo()
 
1435
 
 
1436
            tarinfo.name = "\xe4\xf6\xfc"
 
1437
            self.assertRaises(UnicodeError, tar.addfile, tarinfo)
 
1438
 
 
1439
            tarinfo.name = "foo"
 
1440
            tarinfo.uname = "\xe4\xf6\xfc"
 
1441
            self.assertRaises(UnicodeError, tar.addfile, tarinfo)
 
1442
        finally:
 
1443
            tar.close()
 
1444
 
 
1445
    def test_unicode_argument(self):
 
1446
        tar = tarfile.open(tarname, "r",
 
1447
                           encoding="iso8859-1", errors="strict")
 
1448
        try:
 
1449
            for t in tar:
 
1450
                self.assertIs(type(t.name), str)
 
1451
                self.assertIs(type(t.linkname), str)
 
1452
                self.assertIs(type(t.uname), str)
 
1453
                self.assertIs(type(t.gname), str)
 
1454
        finally:
 
1455
            tar.close()
 
1456
 
 
1457
    def test_uname_unicode(self):
 
1458
        t = tarfile.TarInfo("foo")
 
1459
        t.uname = "\xe4\xf6\xfc"
 
1460
        t.gname = "\xe4\xf6\xfc"
 
1461
 
 
1462
        tar = tarfile.open(tmpname, mode="w", format=self.format,
 
1463
                           encoding="iso8859-1")
 
1464
        try:
 
1465
            tar.addfile(t)
 
1466
        finally:
 
1467
            tar.close()
 
1468
 
 
1469
        tar = tarfile.open(tmpname, encoding="iso8859-1")
 
1470
        try:
 
1471
            t = tar.getmember("foo")
 
1472
            self.assertEqual(t.uname, "\xe4\xf6\xfc")
 
1473
            self.assertEqual(t.gname, "\xe4\xf6\xfc")
 
1474
 
 
1475
            if self.format != tarfile.PAX_FORMAT:
 
1476
                tar.close()
 
1477
                tar = tarfile.open(tmpname, encoding="ascii")
 
1478
                t = tar.getmember("foo")
 
1479
                self.assertEqual(t.uname, "\udce4\udcf6\udcfc")
 
1480
                self.assertEqual(t.gname, "\udce4\udcf6\udcfc")
 
1481
        finally:
 
1482
            tar.close()
 
1483
 
 
1484
 
 
1485
class GNUUnicodeTest(UstarUnicodeTest):
 
1486
 
 
1487
    format = tarfile.GNU_FORMAT
 
1488
 
 
1489
    def test_bad_pax_header(self):
 
1490
        # Test for issue #8633. GNU tar <= 1.23 creates raw binary fields
 
1491
        # without a hdrcharset=BINARY header.
 
1492
        for encoding, name in (
 
1493
                ("utf-8", "pax/bad-pax-\udce4\udcf6\udcfc"),
 
1494
                ("iso8859-1", "pax/bad-pax-\xe4\xf6\xfc"),):
 
1495
            with tarfile.open(tarname, encoding=encoding,
 
1496
                              errors="surrogateescape") as tar:
 
1497
                try:
 
1498
                    t = tar.getmember(name)
 
1499
                except KeyError:
 
1500
                    self.fail("unable to read bad GNU tar pax header")
 
1501
 
 
1502
 
 
1503
class PAXUnicodeTest(UstarUnicodeTest):
 
1504
 
 
1505
    format = tarfile.PAX_FORMAT
 
1506
 
 
1507
    # PAX_FORMAT ignores encoding in write mode.
 
1508
    test_unicode_filename_error = None
 
1509
 
 
1510
    def test_binary_header(self):
 
1511
        # Test a POSIX.1-2008 compatible header with a hdrcharset=BINARY field.
 
1512
        for encoding, name in (
 
1513
                ("utf-8", "pax/hdrcharset-\udce4\udcf6\udcfc"),
 
1514
                ("iso8859-1", "pax/hdrcharset-\xe4\xf6\xfc"),):
 
1515
            with tarfile.open(tarname, encoding=encoding,
 
1516
                              errors="surrogateescape") as tar:
 
1517
                try:
 
1518
                    t = tar.getmember(name)
 
1519
                except KeyError:
 
1520
                    self.fail("unable to read POSIX.1-2008 binary header")
 
1521
 
 
1522
 
 
1523
class AppendTestBase:
 
1524
    # Test append mode (cp. patch #1652681).
 
1525
 
 
1526
    def setUp(self):
 
1527
        self.tarname = tmpname
 
1528
        if os.path.exists(self.tarname):
 
1529
            os.remove(self.tarname)
 
1530
 
 
1531
    def _create_testtar(self, mode="w:"):
 
1532
        with tarfile.open(tarname, encoding="iso8859-1") as src:
 
1533
            t = src.getmember("ustar/regtype")
 
1534
            t.name = "foo"
 
1535
            with src.extractfile(t) as f:
 
1536
                with tarfile.open(self.tarname, mode) as tar:
 
1537
                    tar.addfile(t, f)
 
1538
 
 
1539
    def test_append_compressed(self):
 
1540
        self._create_testtar("w:" + self.suffix)
 
1541
        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
 
1542
 
 
1543
class AppendTest(AppendTestBase, unittest.TestCase):
 
1544
    test_append_compressed = None
 
1545
 
 
1546
    def _add_testfile(self, fileobj=None):
 
1547
        with tarfile.open(self.tarname, "a", fileobj=fileobj) as tar:
 
1548
            tar.addfile(tarfile.TarInfo("bar"))
 
1549
 
 
1550
    def _test(self, names=["bar"], fileobj=None):
 
1551
        with tarfile.open(self.tarname, fileobj=fileobj) as tar:
 
1552
            self.assertEqual(tar.getnames(), names)
 
1553
 
 
1554
    def test_non_existing(self):
 
1555
        self._add_testfile()
 
1556
        self._test()
 
1557
 
 
1558
    def test_empty(self):
 
1559
        tarfile.open(self.tarname, "w:").close()
 
1560
        self._add_testfile()
 
1561
        self._test()
 
1562
 
 
1563
    def test_empty_fileobj(self):
 
1564
        fobj = io.BytesIO(b"\0" * 1024)
 
1565
        self._add_testfile(fobj)
 
1566
        fobj.seek(0)
 
1567
        self._test(fileobj=fobj)
 
1568
 
 
1569
    def test_fileobj(self):
 
1570
        self._create_testtar()
 
1571
        with open(self.tarname, "rb") as fobj:
 
1572
            data = fobj.read()
 
1573
        fobj = io.BytesIO(data)
 
1574
        self._add_testfile(fobj)
 
1575
        fobj.seek(0)
 
1576
        self._test(names=["foo", "bar"], fileobj=fobj)
 
1577
 
 
1578
    def test_existing(self):
 
1579
        self._create_testtar()
 
1580
        self._add_testfile()
 
1581
        self._test(names=["foo", "bar"])
 
1582
 
 
1583
    # Append mode is supposed to fail if the tarfile to append to
 
1584
    # does not end with a zero block.
 
1585
    def _test_error(self, data):
 
1586
        with open(self.tarname, "wb") as fobj:
 
1587
            fobj.write(data)
 
1588
        self.assertRaises(tarfile.ReadError, self._add_testfile)
 
1589
 
 
1590
    def test_null(self):
 
1591
        self._test_error(b"")
 
1592
 
 
1593
    def test_incomplete(self):
 
1594
        self._test_error(b"\0" * 13)
 
1595
 
 
1596
    def test_premature_eof(self):
 
1597
        data = tarfile.TarInfo("foo").tobuf()
 
1598
        self._test_error(data)
 
1599
 
 
1600
    def test_trailing_garbage(self):
 
1601
        data = tarfile.TarInfo("foo").tobuf()
 
1602
        self._test_error(data + b"\0" * 13)
 
1603
 
 
1604
    def test_invalid(self):
 
1605
        self._test_error(b"a" * 512)
 
1606
 
 
1607
class GzipAppendTest(GzipTest, AppendTestBase, unittest.TestCase):
 
1608
    pass
 
1609
 
 
1610
class Bz2AppendTest(Bz2Test, AppendTestBase, unittest.TestCase):
 
1611
    pass
 
1612
 
 
1613
class LzmaAppendTest(LzmaTest, AppendTestBase, unittest.TestCase):
 
1614
    pass
 
1615
 
 
1616
 
 
1617
class LimitsTest(unittest.TestCase):
 
1618
 
 
1619
    def test_ustar_limits(self):
 
1620
        # 100 char name
 
1621
        tarinfo = tarfile.TarInfo("0123456789" * 10)
 
1622
        tarinfo.tobuf(tarfile.USTAR_FORMAT)
 
1623
 
 
1624
        # 101 char name that cannot be stored
 
1625
        tarinfo = tarfile.TarInfo("0123456789" * 10 + "0")
 
1626
        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
 
1627
 
 
1628
        # 256 char name with a slash at pos 156
 
1629
        tarinfo = tarfile.TarInfo("123/" * 62 + "longname")
 
1630
        tarinfo.tobuf(tarfile.USTAR_FORMAT)
 
1631
 
 
1632
        # 256 char name that cannot be stored
 
1633
        tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname")
 
1634
        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
 
1635
 
 
1636
        # 512 char name
 
1637
        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
 
1638
        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
 
1639
 
 
1640
        # 512 char linkname
 
1641
        tarinfo = tarfile.TarInfo("longlink")
 
1642
        tarinfo.linkname = "123/" * 126 + "longname"
 
1643
        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
 
1644
 
 
1645
        # uid > 8 digits
 
1646
        tarinfo = tarfile.TarInfo("name")
 
1647
        tarinfo.uid = 0o10000000
 
1648
        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
 
1649
 
 
1650
    def test_gnu_limits(self):
 
1651
        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
 
1652
        tarinfo.tobuf(tarfile.GNU_FORMAT)
 
1653
 
 
1654
        tarinfo = tarfile.TarInfo("longlink")
 
1655
        tarinfo.linkname = "123/" * 126 + "longname"
 
1656
        tarinfo.tobuf(tarfile.GNU_FORMAT)
 
1657
 
 
1658
        # uid >= 256 ** 7
 
1659
        tarinfo = tarfile.TarInfo("name")
 
1660
        tarinfo.uid = 0o4000000000000000000
 
1661
        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT)
 
1662
 
 
1663
    def test_pax_limits(self):
 
1664
        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
 
1665
        tarinfo.tobuf(tarfile.PAX_FORMAT)
 
1666
 
 
1667
        tarinfo = tarfile.TarInfo("longlink")
 
1668
        tarinfo.linkname = "123/" * 126 + "longname"
 
1669
        tarinfo.tobuf(tarfile.PAX_FORMAT)
 
1670
 
 
1671
        tarinfo = tarfile.TarInfo("name")
 
1672
        tarinfo.uid = 0o4000000000000000000
 
1673
        tarinfo.tobuf(tarfile.PAX_FORMAT)
 
1674
 
 
1675
 
 
1676
class MiscTest(unittest.TestCase):
 
1677
 
 
1678
    def test_char_fields(self):
 
1679
        self.assertEqual(tarfile.stn("foo", 8, "ascii", "strict"),
 
1680
                         b"foo\0\0\0\0\0")
 
1681
        self.assertEqual(tarfile.stn("foobar", 3, "ascii", "strict"),
 
1682
                         b"foo")
 
1683
        self.assertEqual(tarfile.nts(b"foo\0\0\0\0\0", "ascii", "strict"),
 
1684
                         "foo")
 
1685
        self.assertEqual(tarfile.nts(b"foo\0bar\0", "ascii", "strict"),
 
1686
                         "foo")
 
1687
 
 
1688
    def test_read_number_fields(self):
 
1689
        # Issue 13158: Test if GNU tar specific base-256 number fields
 
1690
        # are decoded correctly.
 
1691
        self.assertEqual(tarfile.nti(b"0000001\x00"), 1)
 
1692
        self.assertEqual(tarfile.nti(b"7777777\x00"), 0o7777777)
 
1693
        self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\x00\x20\x00\x00"),
 
1694
                         0o10000000)
 
1695
        self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\xff\xff\xff\xff"),
 
1696
                         0xffffffff)
 
1697
        self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\xff"),
 
1698
                         -1)
 
1699
        self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\x9c"),
 
1700
                         -100)
 
1701
        self.assertEqual(tarfile.nti(b"\xff\x00\x00\x00\x00\x00\x00\x00"),
 
1702
                         -0x100000000000000)
 
1703
 
 
1704
    def test_write_number_fields(self):
 
1705
        self.assertEqual(tarfile.itn(1), b"0000001\x00")
 
1706
        self.assertEqual(tarfile.itn(0o7777777), b"7777777\x00")
 
1707
        self.assertEqual(tarfile.itn(0o10000000),
 
1708
                         b"\x80\x00\x00\x00\x00\x20\x00\x00")
 
1709
        self.assertEqual(tarfile.itn(0xffffffff),
 
1710
                         b"\x80\x00\x00\x00\xff\xff\xff\xff")
 
1711
        self.assertEqual(tarfile.itn(-1),
 
1712
                         b"\xff\xff\xff\xff\xff\xff\xff\xff")
 
1713
        self.assertEqual(tarfile.itn(-100),
 
1714
                         b"\xff\xff\xff\xff\xff\xff\xff\x9c")
 
1715
        self.assertEqual(tarfile.itn(-0x100000000000000),
 
1716
                         b"\xff\x00\x00\x00\x00\x00\x00\x00")
 
1717
 
 
1718
    def test_number_field_limits(self):
 
1719
        with self.assertRaises(ValueError):
 
1720
            tarfile.itn(-1, 8, tarfile.USTAR_FORMAT)
 
1721
        with self.assertRaises(ValueError):
 
1722
            tarfile.itn(0o10000000, 8, tarfile.USTAR_FORMAT)
 
1723
        with self.assertRaises(ValueError):
 
1724
            tarfile.itn(-0x10000000001, 6, tarfile.GNU_FORMAT)
 
1725
        with self.assertRaises(ValueError):
 
1726
            tarfile.itn(0x10000000000, 6, tarfile.GNU_FORMAT)
 
1727
 
 
1728
 
 
1729
class CommandLineTest(unittest.TestCase):
 
1730
 
 
1731
    def tarfilecmd(self, *args):
 
1732
        rc, out, err = script_helper.assert_python_ok('-m', 'tarfile', *args)
 
1733
        return out.replace(os.linesep.encode(), b'\n')
 
1734
 
 
1735
    def tarfilecmd_failure(self, *args):
 
1736
        return script_helper.assert_python_failure('-m', 'tarfile', *args)
 
1737
 
 
1738
    def make_simple_tarfile(self, tar_name):
 
1739
        files = [support.findfile('tokenize_tests.txt'),
 
1740
                 support.findfile('tokenize_tests-no-coding-cookie-'
 
1741
                                  'and-utf8-bom-sig-only.txt')]
 
1742
        self.addCleanup(support.unlink, tar_name)
 
1743
        with tarfile.open(tar_name, 'w') as tf:
 
1744
            for tardata in files:
 
1745
                tf.add(tardata, arcname=os.path.basename(tardata))
 
1746
 
 
1747
    def test_test_command(self):
 
1748
        for tar_name in testtarnames:
 
1749
            for opt in '-t', '--test':
 
1750
                out = self.tarfilecmd(opt, tar_name)
 
1751
                self.assertEqual(out, b'')
 
1752
 
 
1753
    def test_test_command_verbose(self):
 
1754
        for tar_name in testtarnames:
 
1755
            for opt in '-v', '--verbose':
 
1756
                out = self.tarfilecmd(opt, '-t', tar_name)
 
1757
                self.assertIn(b'is a tar archive.\n', out)
 
1758
 
 
1759
    def test_test_command_invalid_file(self):
 
1760
        zipname = support.findfile('zipdir.zip')
 
1761
        rc, out, err = self.tarfilecmd_failure('-t', zipname)
 
1762
        self.assertIn(b' is not a tar archive.', err)
 
1763
        self.assertEqual(out, b'')
 
1764
        self.assertEqual(rc, 1)
 
1765
 
 
1766
        for tar_name in testtarnames:
 
1767
            with self.subTest(tar_name=tar_name):
 
1768
                with open(tar_name, 'rb') as f:
 
1769
                    data = f.read()
 
1770
                try:
 
1771
                    with open(tmpname, 'wb') as f:
 
1772
                        f.write(data[:511])
 
1773
                    rc, out, err = self.tarfilecmd_failure('-t', tmpname)
 
1774
                    self.assertEqual(out, b'')
 
1775
                    self.assertEqual(rc, 1)
 
1776
                finally:
 
1777
                    support.unlink(tmpname)
 
1778
 
 
1779
    def test_list_command(self):
 
1780
        self.make_simple_tarfile(tmpname)
 
1781
        with support.captured_stdout() as t:
 
1782
            with tarfile.open(tmpname, 'r') as tf:
 
1783
                tf.list(verbose=False)
 
1784
        expected = t.getvalue().encode(sys.getfilesystemencoding())
 
1785
        for opt in '-l', '--list':
 
1786
            out = self.tarfilecmd(opt, tmpname)
 
1787
            self.assertEqual(out, expected)
 
1788
 
 
1789
    def test_list_command_verbose(self):
 
1790
        self.make_simple_tarfile(tmpname)
 
1791
        with support.captured_stdout() as t:
 
1792
            with tarfile.open(tmpname, 'r') as tf:
 
1793
                tf.list(verbose=True)
 
1794
        expected = t.getvalue().encode(sys.getfilesystemencoding())
 
1795
        for opt in '-v', '--verbose':
 
1796
            out = self.tarfilecmd(opt, '-l', tmpname)
 
1797
            self.assertEqual(out, expected)
 
1798
 
 
1799
    def test_list_command_invalid_file(self):
 
1800
        zipname = support.findfile('zipdir.zip')
 
1801
        rc, out, err = self.tarfilecmd_failure('-l', zipname)
 
1802
        self.assertIn(b' is not a tar archive.', err)
 
1803
        self.assertEqual(out, b'')
 
1804
        self.assertEqual(rc, 1)
 
1805
 
 
1806
    def test_create_command(self):
 
1807
        files = [support.findfile('tokenize_tests.txt'),
 
1808
                 support.findfile('tokenize_tests-no-coding-cookie-'
 
1809
                                  'and-utf8-bom-sig-only.txt')]
 
1810
        for opt in '-c', '--create':
 
1811
            try:
 
1812
                out = self.tarfilecmd(opt, tmpname, *files)
 
1813
                self.assertEqual(out, b'')
 
1814
                with tarfile.open(tmpname) as tar:
 
1815
                    tar.getmembers()
 
1816
            finally:
 
1817
                support.unlink(tmpname)
 
1818
 
 
1819
    def test_create_command_verbose(self):
 
1820
        files = [support.findfile('tokenize_tests.txt'),
 
1821
                 support.findfile('tokenize_tests-no-coding-cookie-'
 
1822
                                  'and-utf8-bom-sig-only.txt')]
 
1823
        for opt in '-v', '--verbose':
 
1824
            try:
 
1825
                out = self.tarfilecmd(opt, '-c', tmpname, *files)
 
1826
                self.assertIn(b' file created.', out)
 
1827
                with tarfile.open(tmpname) as tar:
 
1828
                    tar.getmembers()
 
1829
            finally:
 
1830
                support.unlink(tmpname)
 
1831
 
 
1832
    def test_create_command_dotless_filename(self):
 
1833
        files = [support.findfile('tokenize_tests.txt')]
 
1834
        try:
 
1835
            out = self.tarfilecmd('-c', dotlessname, *files)
 
1836
            self.assertEqual(out, b'')
 
1837
            with tarfile.open(dotlessname) as tar:
 
1838
                tar.getmembers()
 
1839
        finally:
 
1840
            support.unlink(dotlessname)
 
1841
 
 
1842
    def test_create_command_dot_started_filename(self):
 
1843
        tar_name = os.path.join(TEMPDIR, ".testtar")
 
1844
        files = [support.findfile('tokenize_tests.txt')]
 
1845
        try:
 
1846
            out = self.tarfilecmd('-c', tar_name, *files)
 
1847
            self.assertEqual(out, b'')
 
1848
            with tarfile.open(tar_name) as tar:
 
1849
                tar.getmembers()
 
1850
        finally:
 
1851
            support.unlink(tar_name)
 
1852
 
 
1853
    def test_extract_command(self):
 
1854
        self.make_simple_tarfile(tmpname)
 
1855
        for opt in '-e', '--extract':
 
1856
            try:
 
1857
                with support.temp_cwd(tarextdir):
 
1858
                    out = self.tarfilecmd(opt, tmpname)
 
1859
                self.assertEqual(out, b'')
 
1860
            finally:
 
1861
                support.rmtree(tarextdir)
 
1862
 
 
1863
    def test_extract_command_verbose(self):
 
1864
        self.make_simple_tarfile(tmpname)
 
1865
        for opt in '-v', '--verbose':
 
1866
            try:
 
1867
                with support.temp_cwd(tarextdir):
 
1868
                    out = self.tarfilecmd(opt, '-e', tmpname)
 
1869
                self.assertIn(b' file is extracted.', out)
 
1870
            finally:
 
1871
                support.rmtree(tarextdir)
 
1872
 
 
1873
    def test_extract_command_different_directory(self):
 
1874
        self.make_simple_tarfile(tmpname)
 
1875
        try:
 
1876
            with support.temp_cwd(tarextdir):
 
1877
                out = self.tarfilecmd('-e', tmpname, 'spamdir')
 
1878
            self.assertEqual(out, b'')
 
1879
        finally:
 
1880
            support.rmtree(tarextdir)
 
1881
 
 
1882
    def test_extract_command_invalid_file(self):
 
1883
        zipname = support.findfile('zipdir.zip')
 
1884
        with support.temp_cwd(tarextdir):
 
1885
            rc, out, err = self.tarfilecmd_failure('-e', zipname)
 
1886
        self.assertIn(b' is not a tar archive.', err)
 
1887
        self.assertEqual(out, b'')
 
1888
        self.assertEqual(rc, 1)
 
1889
 
 
1890
 
 
1891
class ContextManagerTest(unittest.TestCase):
 
1892
 
 
1893
    def test_basic(self):
 
1894
        with tarfile.open(tarname) as tar:
 
1895
            self.assertFalse(tar.closed, "closed inside runtime context")
 
1896
        self.assertTrue(tar.closed, "context manager failed")
 
1897
 
 
1898
    def test_closed(self):
 
1899
        # The __enter__() method is supposed to raise OSError
 
1900
        # if the TarFile object is already closed.
 
1901
        tar = tarfile.open(tarname)
 
1902
        tar.close()
 
1903
        with self.assertRaises(OSError):
 
1904
            with tar:
 
1905
                pass
 
1906
 
 
1907
    def test_exception(self):
 
1908
        # Test if the OSError exception is passed through properly.
 
1909
        with self.assertRaises(Exception) as exc:
 
1910
            with tarfile.open(tarname) as tar:
 
1911
                raise OSError
 
1912
        self.assertIsInstance(exc.exception, OSError,
 
1913
                              "wrong exception raised in context manager")
 
1914
        self.assertTrue(tar.closed, "context manager failed")
 
1915
 
 
1916
    def test_no_eof(self):
 
1917
        # __exit__() must not write end-of-archive blocks if an
 
1918
        # exception was raised.
 
1919
        try:
 
1920
            with tarfile.open(tmpname, "w") as tar:
 
1921
                raise Exception
 
1922
        except:
 
1923
            pass
 
1924
        self.assertEqual(os.path.getsize(tmpname), 0,
 
1925
                "context manager wrote an end-of-archive block")
 
1926
        self.assertTrue(tar.closed, "context manager failed")
 
1927
 
 
1928
    def test_eof(self):
 
1929
        # __exit__() must write end-of-archive blocks, i.e. call
 
1930
        # TarFile.close() if there was no error.
 
1931
        with tarfile.open(tmpname, "w"):
 
1932
            pass
 
1933
        self.assertNotEqual(os.path.getsize(tmpname), 0,
 
1934
                "context manager wrote no end-of-archive block")
 
1935
 
 
1936
    def test_fileobj(self):
 
1937
        # Test that __exit__() did not close the external file
 
1938
        # object.
 
1939
        with open(tmpname, "wb") as fobj:
 
1940
            try:
 
1941
                with tarfile.open(fileobj=fobj, mode="w") as tar:
 
1942
                    raise Exception
 
1943
            except:
 
1944
                pass
 
1945
            self.assertFalse(fobj.closed, "external file object was closed")
 
1946
            self.assertTrue(tar.closed, "context manager failed")
 
1947
 
 
1948
 
 
1949
@unittest.skipIf(hasattr(os, "link"), "requires os.link to be missing")
 
1950
class LinkEmulationTest(ReadTest, unittest.TestCase):
 
1951
 
 
1952
    # Test for issue #8741 regression. On platforms that do not support
 
1953
    # symbolic or hard links tarfile tries to extract these types of members
 
1954
    # as the regular files they point to.
 
1955
    def _test_link_extraction(self, name):
 
1956
        self.tar.extract(name, TEMPDIR)
 
1957
        with open(os.path.join(TEMPDIR, name), "rb") as f:
 
1958
            data = f.read()
 
1959
        self.assertEqual(md5sum(data), md5_regtype)
 
1960
 
 
1961
    # See issues #1578269, #8879, and #17689 for some history on these skips
 
1962
    @unittest.skipIf(hasattr(os.path, "islink"),
 
1963
                     "Skip emulation - has os.path.islink but not os.link")
 
1964
    def test_hardlink_extraction1(self):
 
1965
        self._test_link_extraction("ustar/lnktype")
 
1966
 
 
1967
    @unittest.skipIf(hasattr(os.path, "islink"),
 
1968
                     "Skip emulation - has os.path.islink but not os.link")
 
1969
    def test_hardlink_extraction2(self):
 
1970
        self._test_link_extraction("./ustar/linktest2/lnktype")
 
1971
 
 
1972
    @unittest.skipIf(hasattr(os, "symlink"),
 
1973
                     "Skip emulation if symlink exists")
 
1974
    def test_symlink_extraction1(self):
 
1975
        self._test_link_extraction("ustar/symtype")
 
1976
 
 
1977
    @unittest.skipIf(hasattr(os, "symlink"),
 
1978
                     "Skip emulation if symlink exists")
 
1979
    def test_symlink_extraction2(self):
 
1980
        self._test_link_extraction("./ustar/linktest2/symtype")
 
1981
 
 
1982
 
 
1983
class Bz2PartialReadTest(Bz2Test, unittest.TestCase):
 
1984
    # Issue5068: The _BZ2Proxy.read() method loops forever
 
1985
    # on an empty or partial bzipped file.
 
1986
 
 
1987
    def _test_partial_input(self, mode):
 
1988
        class MyBytesIO(io.BytesIO):
 
1989
            hit_eof = False
 
1990
            def read(self, n):
 
1991
                if self.hit_eof:
 
1992
                    raise AssertionError("infinite loop detected in "
 
1993
                                         "tarfile.open()")
 
1994
                self.hit_eof = self.tell() == len(self.getvalue())
 
1995
                return super(MyBytesIO, self).read(n)
 
1996
            def seek(self, *args):
 
1997
                self.hit_eof = False
 
1998
                return super(MyBytesIO, self).seek(*args)
 
1999
 
 
2000
        data = bz2.compress(tarfile.TarInfo("foo").tobuf())
 
2001
        for x in range(len(data) + 1):
 
2002
            try:
 
2003
                tarfile.open(fileobj=MyBytesIO(data[:x]), mode=mode)
 
2004
            except tarfile.ReadError:
 
2005
                pass # we have no interest in ReadErrors
 
2006
 
 
2007
    def test_partial_input(self):
 
2008
        self._test_partial_input("r")
 
2009
 
 
2010
    def test_partial_input_bz2(self):
 
2011
        self._test_partial_input("r:bz2")
 
2012
 
 
2013
 
 
2014
def setUpModule():
 
2015
    support.unlink(TEMPDIR)
 
2016
    os.makedirs(TEMPDIR)
 
2017
 
 
2018
    global testtarnames
 
2019
    testtarnames = [tarname]
 
2020
    with open(tarname, "rb") as fobj:
 
2021
        data = fobj.read()
 
2022
 
 
2023
    # Create compressed tarfiles.
 
2024
    for c in GzipTest, Bz2Test, LzmaTest:
 
2025
        if c.open:
 
2026
            support.unlink(c.tarname)
 
2027
            testtarnames.append(c.tarname)
 
2028
            with c.open(c.tarname, "wb") as tar:
 
2029
                tar.write(data)
 
2030
 
 
2031
def tearDownModule():
 
2032
    if os.path.exists(TEMPDIR):
 
2033
        shutil.rmtree(TEMPDIR)
 
2034
 
 
2035
if __name__ == "__main__":
 
2036
    unittest.main()