~moritzm/duplicity/duplicity

« back to all changes in this revision

Viewing changes to testing/test_tarfile.py

  • Committer: bescoto
  • Date: 2002-10-29 01:49:46 UTC
  • Revision ID: vcs-imports@canonical.com-20021029014946-3m4rmm5plom7pl6q
Initial checkin

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# unittest for the tarfile module
 
2
#
 
3
# $Id: test_tarfile.py,v 1.1 2002/10/29 01:49:47 bescoto Exp $
 
4
 
 
5
from __future__ import generators
 
6
import sys
 
7
import os
 
8
import shutil
 
9
import StringIO
 
10
import tempfile
 
11
 
 
12
import unittest
 
13
import stat
 
14
sys.path.insert(0, "../src")
 
15
import tarfile
 
16
 
 
17
SAMPLETAR = "testtar.tar"
 
18
TEMPDIR   = tempfile.mktemp()
 
19
 
 
20
def join(*args):
 
21
    return os.path.normpath(apply(os.path.join, args))
 
22
 
 
23
class BaseTest(unittest.TestCase):
 
24
    """Base test for tarfile.
 
25
    """
 
26
 
 
27
    def setUp(self):
 
28
        os.mkdir(TEMPDIR)
 
29
        self.tar = tarfile.open(SAMPLETAR)
 
30
        self.tar.errorlevel = 1
 
31
 
 
32
    def tearDown(self):
 
33
        self.tar.close()
 
34
        shutil.rmtree(TEMPDIR)
 
35
 
 
36
    def isroot(self):
 
37
        return hasattr(os, "geteuid") and os.geteuid() == 0
 
38
 
 
39
class Test_All(BaseTest):
 
40
    """Allround test.
 
41
    """
 
42
    files_in_tempdir = ["tempdir", "tempdir/0length",
 
43
                        "tempdir/large",
 
44
                        "tempdir/hardlinked1", "tempdir/hardlinked2",
 
45
                        "tempdir/fifo",
 
46
                        "tempdir/symlink"]
 
47
 
 
48
    tempdir_data = {"0length": "",
 
49
                     "large": "hello, world!" * 10000,
 
50
                     "hardlinked1": "foo",
 
51
                     "hardlinked2": "foo"}
 
52
 
 
53
    def test_iteration(self):
 
54
        """Test iteration through temp2.tar"""
 
55
        i = 0
 
56
        tf = tarfile.TarFile("none", "r", FileLogger(open("temp2.tar", "rb")))
 
57
        tf.debug = 3
 
58
        for tarinfo in tf: i += 1
 
59
        assert i >= 6, i
 
60
 
 
61
 
 
62
    def _test_extraction(self):
 
63
        """Test if regular files and links are extracted correctly.
 
64
        """
 
65
        for tarinfo in self.tar:
 
66
            if tarinfo.isreg() or tarinfo.islnk() or tarinfo.issym():
 
67
                self.tar.extract(tarinfo, TEMPDIR)
 
68
                name  = join(TEMPDIR, tarinfo.name)
 
69
                data1 = file(name, "rb").read()
 
70
                data2 = self.tar.extractfile(tarinfo).read()
 
71
                self.assert_(data1 == data2,
 
72
                             "%s was not extracted successfully."
 
73
                             % tarinfo.name)
 
74
 
 
75
                if not tarinfo.issym():
 
76
                    self.assert_(tarinfo.mtime == os.path.getmtime(name),
 
77
                                "%s's modification time was not set correctly."
 
78
                                % tarinfo.name)
 
79
 
 
80
            if tarinfo.isdev():
 
81
                if hasattr(os, "mkfifo") and tarinfo.isfifo():
 
82
                    self.tar.extract(tarinfo, TEMPDIR)
 
83
                    name = join(TEMPDIR, tarinfo.name)
 
84
                    self.assert_(tarinfo.mtime == os.path.getmtime(name),
 
85
                                "%s's modification time was not set correctly."
 
86
                                % tarinfo.name)
 
87
 
 
88
                elif hasattr(os, "mknod") and self.isroot():
 
89
                    self.tar.extract(tarinfo, TEMPDIR)
 
90
                    name = join(TEMPDIR, tarinfo.name)
 
91
                    self.assert_(tarinfo.mtime == os.path.getmtime(name),
 
92
                                "%s's modification time was not set correctly."
 
93
                                % tarinfo.name)
 
94
 
 
95
    def test_addition(self):
 
96
        """Test if regular files are added correctly.
 
97
           For this, we extract all regular files from our sample tar
 
98
           and add them to a new one, which we check afterwards.
 
99
        """
 
100
        files = []
 
101
        for tarinfo in self.tar:
 
102
            if tarinfo.isreg():
 
103
                self.tar.extract(tarinfo, TEMPDIR)
 
104
                files.append(tarinfo.name)
 
105
 
 
106
        buf = StringIO.StringIO()
 
107
        tar = tarfile.open("test.tar", "w", buf)
 
108
        for f in files:
 
109
            path = join(TEMPDIR, f)
 
110
            tarinfo = tar.gettarinfo(path)
 
111
            tarinfo.name = f
 
112
            tar.addfile(tarinfo, file(path, "rb"))
 
113
        tar.close()
 
114
 
 
115
        buf.seek(0)
 
116
        tar = tarfile.open("test.tar", "r", buf)
 
117
        for tarinfo in tar:
 
118
            data1 = file(join(TEMPDIR, tarinfo.name), "rb").read()
 
119
            data2 = tar.extractfile(tarinfo).read()
 
120
            self.assert_(data1 == data2)
 
121
        tar.close()
 
122
 
 
123
    def make_tempdir(self):
 
124
        """Make a temp directory with assorted files in it"""
 
125
        try: os.lstat("tempdir")
 
126
        except OSError: pass
 
127
        else: # assume already exists
 
128
            assert not os.system("rm -r tempdir")
 
129
        os.mkdir("tempdir")
 
130
 
 
131
        def write_file(name):
 
132
            """Write appropriate data into file named name in tempdir"""
 
133
            fp = open("tempdir/%s" % (name,), "wb")
 
134
            fp.write(self.tempdir_data[name])
 
135
            fp.close()
 
136
 
 
137
        # Make 0length file
 
138
        write_file("0length")
 
139
        os.chmod("tempdir/%s" % ("0length",), 0604)
 
140
 
 
141
        # Make regular file 130000 bytes in length
 
142
        write_file("large")
 
143
 
 
144
        # Make hard linked files
 
145
        write_file("hardlinked1")
 
146
        os.link("tempdir/hardlinked1", "tempdir/hardlinked2")
 
147
 
 
148
        # Make a fifo
 
149
        os.mkfifo("tempdir/fifo")
 
150
 
 
151
        # Make symlink
 
152
        os.symlink("foobar", "tempdir/symlink")
 
153
 
 
154
    def make_temptar(self):
 
155
        """Tar up tempdir, write to "temp2.tar" """
 
156
        try: os.lstat("temp2.tar")
 
157
        except OSError: pass
 
158
        else: assert not os.system("rm temp2.tar")
 
159
 
 
160
        self.make_tempdir()
 
161
        tf = tarfile.TarFile("temp2.tar", "w")
 
162
        for filename in self.files_in_tempdir:
 
163
            tf.add(filename, filename, 0)
 
164
        tf.close()
 
165
 
 
166
    def make_temptar_iterator(self):
 
167
        """Tar up tempdir using an iterator"""
 
168
        try: os.lstat("temp2.tar")
 
169
        except OSError: pass
 
170
        else: assert not os.system("rm temp2.tar")
 
171
 
 
172
        self.make_tempdir()
 
173
        def generate_pairs(tfi_list):
 
174
            for filename in self.files_in_tempdir:
 
175
                ti = tarfile.TarInfo()
 
176
                ti.set_arcname(filename)
 
177
                ti.init_from_stat(os.lstat(filename))
 
178
                if filename == "tempdir/hardlinked2":
 
179
                    ti.type = tarfile.LNKTYPE
 
180
                    ti.linkname = "tempdir/hardlinked1"
 
181
                    yield (ti, None)
 
182
                elif filename == "tempdir" or filename == "tempdir/fifo":
 
183
                    yield (ti, None)
 
184
                elif filename == "tempdir/symlink":
 
185
                    ti.linkname = os.readlink(filename)
 
186
                    yield (ti, None)
 
187
                else: yield (ti, open(filename, "rb"))
 
188
        tfi_list = [None]
 
189
        tfi = tarfile.TarFromIterator(generate_pairs(tfi_list))
 
190
        tfi_list[0] = tfi # now generate_pairs can find tfi
 
191
  
 
192
        buf = tfi.read()
 
193
        tfi.close()
 
194
        fout = open("temp2.tar", "wb")
 
195
        fout.write(buf)
 
196
        fout.close()
 
197
 
 
198
    def test_tarfile_creation(self):
 
199
        """Create directory, make tarfile, extract using gnutar, compare"""
 
200
        self.make_temptar()
 
201
        self.extract_and_compare_tarfile()
 
202
 
 
203
    def test_tarfile_creation_from_iterator(self):
 
204
        """Same as test_tarfile_creation, but use iterator interface"""
 
205
        self.make_temptar_iterator()
 
206
        self.extract_and_compare_tarfile()
 
207
 
 
208
    def extract_and_compare_tarfile(self):
 
209
        os.system("rm -r tempdir")
 
210
        assert not os.system("tar -xf temp2.tar")
 
211
 
 
212
        def compare_data(name):
 
213
            """Assert data is what should be"""
 
214
            fp = open("tempdir/" + name, "rb")
 
215
            buf = fp.read()
 
216
            fp.close()
 
217
            assert buf == self.tempdir_data[name]
 
218
 
 
219
        s = os.lstat("tempdir")
 
220
        assert stat.S_ISDIR(s.st_mode)
 
221
 
 
222
        for key in self.tempdir_data: compare_data(key)
 
223
 
 
224
        # Check to make sure permissions saved
 
225
        s = os.lstat("tempdir/0length")
 
226
        assert stat.S_IMODE(s.st_mode) == 0604, stat.S_IMODE(s.st_mode)
 
227
 
 
228
        s = os.lstat("tempdir/fifo")
 
229
        assert stat.S_ISFIFO(s.st_mode)
 
230
 
 
231
        # Check to make sure hardlinked files still hardlinked
 
232
        s1 = os.lstat("tempdir/hardlinked1")
 
233
        s2 = os.lstat("tempdir/hardlinked2")
 
234
        assert s1.st_ino == s2.st_ino
 
235
 
 
236
        # Check symlink
 
237
        s = os.lstat("tempdir/symlink")
 
238
        assert stat.S_ISLNK(s.st_mode)
 
239
        
 
240
 
 
241
class Test_FObj(BaseTest):
 
242
    """Test for read operations via file-object.
 
243
    """
 
244
 
 
245
    def _test_sparse(self):
 
246
        """Test extraction of the sparse file.
 
247
        """
 
248
        BLOCK = 4096
 
249
        for tarinfo in self.tar:
 
250
            if tarinfo.issparse():
 
251
                f = self.tar.extractfile(tarinfo)
 
252
                b = 0
 
253
                block = 0
 
254
                while 1:
 
255
                    buf = f.read(BLOCK)
 
256
                    if not buf:
 
257
                        break
 
258
                    block += 1
 
259
                    self.assert_(BLOCK == len(buf))
 
260
                    if not b:
 
261
                        self.assert_("\0" * BLOCK == buf,
 
262
                                     "sparse block is broken")
 
263
                    else:
 
264
                        self.assert_("0123456789ABCDEF" * 256 == buf,
 
265
                                     "sparse block is broken")
 
266
                    b = 1 - b
 
267
                self.assert_(block == 24, "too few sparse blocks")
 
268
                f.close()
 
269
 
 
270
    def _test_readlines(self):
 
271
        """Test readlines() method of _FileObject.
 
272
        """
 
273
        self.tar.extract("pep.txt", TEMPDIR)
 
274
        lines1 = file(join(TEMPDIR, "pep.txt"), "r").readlines()
 
275
        lines2 = self.tar.extractfile("pep.txt").readlines()
 
276
        self.assert_(lines1 == lines2, "readline() does not work correctly")
 
277
 
 
278
    def _test_seek(self):
 
279
        """Test seek() method of _FileObject, incl. random reading.
 
280
        """
 
281
        self.tar.extract("pep.txt", TEMPDIR)
 
282
        data = file(join(TEMPDIR, "pep.txt"), "rb").read()
 
283
 
 
284
        tarinfo = self.tar.getmember("pep.txt")
 
285
        fobj = self.tar.extractfile(tarinfo)
 
286
 
 
287
        text = fobj.read()
 
288
        fobj.seek(0)
 
289
        self.assert_(0 == fobj.tell(),
 
290
                     "seek() to file's start failed")
 
291
        fobj.seek(4096, 0)
 
292
        self.assert_(4096 == fobj.tell(),
 
293
                     "seek() to absolute position failed")
 
294
        fobj.seek(-2048, 1)
 
295
        self.assert_(2048 == fobj.tell(),
 
296
                     "seek() to negative relative position failed")
 
297
        fobj.seek(2048, 1)
 
298
        self.assert_(4096 == fobj.tell(),
 
299
                     "seek() to positive relative position failed")
 
300
        s = fobj.read(10)
 
301
        self.assert_(s == data[4096:4106],
 
302
                     "read() after seek failed")
 
303
        fobj.seek(0, 2)
 
304
        self.assert_(tarinfo.size == fobj.tell(),
 
305
                     "seek() to file's end failed")
 
306
        self.assert_(fobj.read() == "",
 
307
                     "read() at file's end did not return empty string")
 
308
        fobj.seek(-tarinfo.size, 2)
 
309
        self.assert_(0 == fobj.tell(),
 
310
                     "relative seek() to file's start failed")
 
311
        fobj.seek(1024)
 
312
        s1 = fobj.readlines()
 
313
        fobj.seek(1024)
 
314
        s2 = fobj.readlines()
 
315
        self.assert_(s1 == s2,
 
316
                     "readlines() after seek failed")
 
317
        fobj.close()
 
318
 
 
319
class FileLogger:
 
320
    """Like a file but log requests"""
 
321
    def __init__(self, infp):
 
322
        self.infp = infp
 
323
    def read(self, length):
 
324
        print "Reading ", length
 
325
        return self.infp.read(length)
 
326
    def seek(self, position):
 
327
        print "Seeking to ", position
 
328
        return self.infp.seek(position)
 
329
    def close(self):
 
330
        print "Closing"
 
331
        return self.infp.close()
 
332
    
 
333
 
 
334
 
 
335
if __name__ == "__main__":
 
336
    # Run all tests
 
337
    unittest.main()