~moritzm/duplicity/duplicity

« back to all changes in this revision

Viewing changes to testing/diffdirtest.py

  • Committer: bescoto
  • Date: 2002-10-29 01:49:46 UTC
  • Revision ID: vcs-imports@canonical.com-20021029014946-3m4rmm5plom7pl6q
Initial checkin

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
from __future__ import generators
 
2
import sys
 
3
sys.path.insert(0, "../src")
 
4
import os, unittest
 
5
from path import *
 
6
import diffdir, selection, tarfile, log
 
7
 
 
8
log.setverbosity(7)
 
9
 
 
10
class DDTest(unittest.TestCase):
 
11
        """Test functions in diffdir.py"""
 
12
        def copyfileobj(self, infp, outfp):
 
13
                """Copy in fileobj to out, closing afterwards"""
 
14
                blocksize = 32 * 1024
 
15
                while 1:
 
16
                        buf = infp.read(blocksize)
 
17
                        if not buf: break
 
18
                        outfp.write(buf)
 
19
                assert not infp.close()
 
20
                assert not outfp.close()
 
21
 
 
22
        def deltmp(self):
 
23
                """Delete temporary directories"""
 
24
                assert not os.system("rm -rf testfiles/output")
 
25
                os.mkdir("testfiles/output")
 
26
 
 
27
        def testsig(self):
 
28
                """Test producing tar signature of various file types"""
 
29
                self.deltmp()
 
30
                select = selection.Select(Path("testfiles/various_file_types"))
 
31
                select.set_iter()
 
32
                sigtar = diffdir.SigTarBlockIter(select)
 
33
                diffdir.write_block_iter(sigtar, "testfiles/output/sigtar")
 
34
                
 
35
                i = 0
 
36
                for tarinfo in tarfile.TarFile("testfiles/output/sigtar", "r"):
 
37
                        i += 1
 
38
                assert i >= 5, "There should be at least 5 files in sigtar"
 
39
 
 
40
        def test_empty_diff(self):
 
41
                """Test producing a diff against same sig; should be len 0"""
 
42
                self.deltmp()
 
43
                select = selection.Select(Path("testfiles/various_file_types"))
 
44
                select.set_iter()
 
45
                sigtar = diffdir.SigTarBlockIter(select)
 
46
                diffdir.write_block_iter(sigtar, "testfiles/output/sigtar")
 
47
 
 
48
                sigtar_fp = open("testfiles/output/sigtar")
 
49
                select2 = selection.Select(Path("testfiles/various_file_types"))
 
50
                select2.set_iter()
 
51
                diffdir.write_block_iter(diffdir.DirDelta(select2, sigtar_fp),
 
52
                                                                 "testfiles/output/difftar")
 
53
 
 
54
                size = os.stat("testfiles/output/difftar").st_size
 
55
                assert size == 0 or size == 10240 # 10240 is size of one record
 
56
 
 
57
        def test_diff(self):
 
58
                """Test making a diff"""
 
59
                self.deltmp()
 
60
                sel1 = selection.Select(Path("testfiles/dir1"))
 
61
                diffdir.write_block_iter(diffdir.SigTarBlockIter(sel1.set_iter()),
 
62
                                                                 "testfiles/output/dir1.sigtar")
 
63
 
 
64
                sigtar_fp = open("testfiles/output/dir1.sigtar", "rb")
 
65
                sel2 = selection.Select(Path("testfiles/dir2"))
 
66
                delta_tar = diffdir.DirDelta(sel2.set_iter(), sigtar_fp)
 
67
                diffdir.write_block_iter(delta_tar,
 
68
                                                                 "testfiles/output/dir1dir2.difftar")
 
69
 
 
70
                changed_files = ["diff/changeable_permission",
 
71
                                                 "diff/regular_file",
 
72
                                                 "snapshot/symbolic_link/",
 
73
                                                 "deleted/deleted_file",
 
74
                                                 "snapshot/directory_to_file",
 
75
                                                 "snapshot/file_to_directory/"]
 
76
                for tarinfo in tarfile.TarFile("testfiles/output/dir1dir2.difftar",
 
77
                                                                           "r"):
 
78
                        if tarinfo.name in changed_files:
 
79
                                changed_files.remove(tarinfo.name)
 
80
                assert not changed_files, ("Following files not found:\n"
 
81
                                                                   + "\n".join(changed_files))
 
82
 
 
83
        def test_diff2(self):
 
84
                """Another diff test - this one involves multivol support"""
 
85
                self.deltmp()
 
86
                sel1 = selection.Select(Path("testfiles/dir2"))
 
87
                diffdir.write_block_iter(diffdir.SigTarBlockIter(sel1.set_iter()),
 
88
                                                                 "testfiles/output/dir2.sigtar")
 
89
 
 
90
                sigtar_fp = open("testfiles/output/dir2.sigtar", "rb")
 
91
                sel2 = selection.Select(Path("testfiles/dir3"))
 
92
                delta_tar = diffdir.DirDelta(sel2.set_iter(), sigtar_fp)
 
93
                diffdir.write_block_iter(delta_tar,
 
94
                                                                 "testfiles/output/dir2dir3.difftar")
 
95
 
 
96
                buffer = ""
 
97
                tf = tarfile.TarFile("testfiles/output/dir2dir3.difftar", "r")
 
98
                for tarinfo in tf:
 
99
                        if tarinfo.name.startswith("multivol_diff/"):
 
100
                                buffer += tf.extractfile(tarinfo).read()
 
101
                assert 3000000 < len(buffer) < 4000000
 
102
                fout = open("testfiles/output/largefile.delta", "wb")
 
103
                fout.write(buffer)
 
104
                fout.close()
 
105
                assert not os.system("rdiff patch testfiles/dir2/largefile "
 
106
                                                         "testfiles/output/largefile.delta "
 
107
                                                         "testfiles/output/largefile.patched")
 
108
                dir3large = open("testfiles/dir3/largefile", "rb").read()
 
109
                patchedlarge = open("testfiles/output/largefile.patched", "rb").read()
 
110
                assert dir3large == patchedlarge
 
111
 
 
112
        def test_dirdelta_write_sig(self):
 
113
                """Test simultaneous delta and sig generation
 
114
 
 
115
                Generate signatures and deltas of dirs1, 2, 3, 4 and compare
 
116
                those produced by DirDelta_WriteSig and other methods.
 
117
 
 
118
                """
 
119
                self.deltmp()
 
120
                deltadir1 = Path("testfiles/output/dir.deltatar1")
 
121
                deltadir2 = Path("testfiles/output/dir.deltatar2")
 
122
                cur_full_sigs = Path("testfiles/output/fullsig.dir1")
 
123
 
 
124
                cur_dir = Path("testfiles/dir1")
 
125
                get_sel = lambda cur_dir: selection.Select(cur_dir).set_iter()
 
126
                diffdir.write_block_iter(diffdir.SigTarBlockIter(get_sel(cur_dir)),
 
127
                                                                 cur_full_sigs)
 
128
 
 
129
                sigstack = [cur_full_sigs]
 
130
                for dirname in ['dir2', 'dir3', 'dir4']:
 
131
                        print "Processing ", dirname
 
132
                        old_dir = cur_dir
 
133
                        cur_dir = Path("testfiles/" + dirname)
 
134
 
 
135
                        old_full_sigs = cur_full_sigs
 
136
                        cur_full_sigs = Path("testfiles/output/fullsig." + dirname)
 
137
 
 
138
                        delta1 = Path("testfiles/output/delta1." + dirname)
 
139
                        delta2 = Path("testfiles/output/delta2." + dirname)
 
140
                        incsig = Path("testfiles/output/incsig." + dirname)
 
141
 
 
142
                        # Write old-style delta to deltadir1
 
143
                        diffdir.write_block_iter(diffdir.DirDelta(get_sel(cur_dir),
 
144
                                                                                           old_full_sigs.open("rb")),
 
145
                                                                         delta1)
 
146
 
 
147
                        # Write new signature and delta to deltadir2 and sigdir2, compare
 
148
                        block_iter = diffdir.DirDelta_WriteSig(get_sel(cur_dir),
 
149
                                                   map(lambda p: p.open("rb"), sigstack),
 
150
                                                                                                   incsig.open("wb"))
 
151
                        sigstack.append(incsig)
 
152
                        diffdir.write_block_iter(block_iter, delta2)
 
153
 
 
154
                        print delta1.name, delta2.name
 
155
                        compare_tar(delta1.open("rb"), delta2.open("rb"))
 
156
                        assert not os.system("cmp %s %s" % (delta1.name, delta2.name))
 
157
 
 
158
                        # Write old-style signature to cur_full_sigs
 
159
                        diffdir.write_block_iter(diffdir.SigTarBlockIter(get_sel(cur_dir)),
 
160
                                                                         cur_full_sigs)
 
161
 
 
162
        def test_combine_path_iters(self):
 
163
                """Test diffdir.combine_path_iters"""
 
164
                class Dummy:
 
165
                        def __init__(self, index, other = None):
 
166
                                self.index = index
 
167
                                self.other = other
 
168
                        def __repr__(self): return "(%s %s)" % (self.index, self.other)
 
169
 
 
170
                def get_iter1():
 
171
                        yield Dummy(())
 
172
                        yield Dummy((1,))
 
173
                        yield Dummy((1, 5), 2)
 
174
 
 
175
                def get_iter2():
 
176
                        yield Dummy((), 2)
 
177
                        yield Dummy((1,5))
 
178
                        yield Dummy((2,))
 
179
 
 
180
                def get_iter3():
 
181
                        yield Dummy((), 3)
 
182
                        yield Dummy((2,), 1)
 
183
 
 
184
                result = diffdir.combine_path_iters([get_iter1(),
 
185
                                                                                         get_iter2(),
 
186
                                                                                         get_iter3()])
 
187
                elem1 = result.next()
 
188
                assert elem1.index == () and elem1.other == 3, elem1
 
189
                elem2 = result.next()
 
190
                assert elem2.index == (1,) and elem2.other is None, elem2
 
191
                elem3 = result.next()
 
192
                assert elem3.index == (1,5) and elem3.other is None
 
193
                elem4 = result.next()
 
194
                assert elem4.index == (2,) and elem4.other == 1
 
195
                try: elem5 = result.next()
 
196
                except StopIteration: pass
 
197
                else: assert 0, elem5
 
198
 
 
199
 
 
200
def compare_tar(tarfile1, tarfile2):
 
201
        """Compare two tarfiles"""
 
202
        tf1 = tarfile.TarFile("none", "r", tarfile1)
 
203
        tf2 = tarfile.TarFile("none", "r", tarfile2)
 
204
        tf2_iter = iter(tf2)
 
205
 
 
206
        for ti1 in tf1:
 
207
                try: ti2 = tf2_iter.next()
 
208
                except StopIteration:
 
209
                        assert 0, ("Premature end to second tarfile, "
 
210
                                           "ti1.name = %s" % ti1.name)
 
211
                #print "Comparing ", ti1.name, ti2.name
 
212
                assert tarinfo_eq(ti1, ti2), "%s %s" % (ti1.name, ti2.name)
 
213
                if ti1.size != 0:
 
214
                        fp1 = tf1.extractfile(ti1)
 
215
                        buf1 = fp1.read()
 
216
                        fp1.close()
 
217
                        fp2 = tf2.extractfile(ti2)
 
218
                        buf2 = fp2.read()
 
219
                        fp2.close()
 
220
                        assert buf1 == buf2
 
221
        try: ti2 = tf2_iter.next()
 
222
        except StopIteration: pass
 
223
        else: assert 0, ("Premature end to first tarfile, "
 
224
                                         "ti2.name = %s" % ti2.name)
 
225
                  
 
226
        tarfile1.close()
 
227
        tarfile2.close()
 
228
 
 
229
def tarinfo_eq(ti1, ti2):
 
230
        if ti1.name != ti2.name:
 
231
                print "Name:", ti1.name, ti2.name
 
232
                return 0
 
233
        if ti1.size != ti2.size:
 
234
                print "Size:", ti1.size, ti2.size
 
235
                return 0
 
236
        if ti1.mtime != ti2.mtime:
 
237
                print "Mtime:", ti1.mtime, ti2.mtime
 
238
                return 0
 
239
        if ti1.mode != ti2.mode:
 
240
                print "Mode:", ti1.mode, ti2.mode
 
241
                return 0
 
242
        if ti1.type != ti2.type:
 
243
                print "Type:", ti1.type, ti2.type
 
244
                return 0
 
245
        if ti1.issym() or ti1.islnk():
 
246
                if ti1.linkname != ti2.linkname:
 
247
                        print "Linkname:", ti1.linkname, ti2.linkname
 
248
                        return 0
 
249
        if ti1.uid != ti2.uid or ti1.gid != ti2.gid:
 
250
                print "IDs:", ti1.uid, ti2.uid, ti1.gid, ti2.gid
 
251
                return 0
 
252
        if ti1.uname != ti2.uname or ti1.gname != ti2.gname:
 
253
                print "Owner names:", ti1.uname, ti2.uname, ti1.gname, ti2.gname
 
254
                return 0
 
255
        return 1
 
256
 
 
257
if __name__ == "__main__": unittest.main()