~ed.so/duplicity/reuse-passphrase-for-signing-fix

« back to all changes in this revision

Viewing changes to testing/patchdirtest.py

  • Committer: loafman
  • Date: 2008-09-09 19:57:46 UTC
  • Revision ID: vcs-imports@canonical.com-20080909195746-9rbllpdu8ngitq2n
Untabify all files.  To compare against previous
versions use 'cvs diff -w' or 'diff -w'.

Show diffs side-by-side

added added

removed removed

Lines of Context:
7
7
config.setup()
8
8
 
9
9
class PatchingTest(unittest.TestCase):
10
 
        """Test patching"""
11
 
        def copyfileobj(self, infp, outfp):
12
 
                """Copy in fileobj to out, closing afterwards"""
13
 
                blocksize = 32 * 1024
14
 
                while 1:
15
 
                        buf = infp.read(blocksize)
16
 
                        if not buf: break
17
 
                        outfp.write(buf)
18
 
                assert not infp.close()
19
 
                assert not outfp.close()
20
 
 
21
 
        def deltmp(self):
22
 
                """Delete temporary directories"""
23
 
                assert not os.system("rm -rf testfiles/output")
24
 
                os.mkdir("testfiles/output")
25
 
 
26
 
        def test_total(self):
27
 
                """Test cycle on dirx"""
28
 
                self.total_sequence(['testfiles/dir1',
29
 
                                                         'testfiles/dir2',
30
 
                                                         'testfiles/dir3'])
31
 
 
32
 
        def get_sel(self, path):
33
 
                """Get selection iter over the given directory"""
34
 
                return selection.Select(path).set_iter()
35
 
 
36
 
        def total_sequence(self, filelist):
37
 
                """Test signatures, diffing, and patching on directory list"""
38
 
                assert len(filelist) >= 2
39
 
                self.deltmp()
40
 
                sig = Path("testfiles/output/sig.tar")
41
 
                diff = Path("testfiles/output/diff.tar")
42
 
                seq_path = Path("testfiles/output/sequence")            
43
 
                new_path, old_path = None, None # set below in for loop
44
 
 
45
 
                # Write initial full backup to diff.tar
46
 
                for dirname in filelist:
47
 
                        old_path, new_path = new_path, Path(dirname)
48
 
                        if old_path:
49
 
                                sigblock = diffdir.DirSig(self.get_sel(seq_path))
50
 
                                diffdir.write_block_iter(sigblock, sig)
51
 
                                deltablock = diffdir.DirDelta(self.get_sel(new_path),
52
 
                                                                                          sig.open("rb"))
53
 
                        else: deltablock = diffdir.DirFull(self.get_sel(new_path))
54
 
                        diffdir.write_block_iter(deltablock, diff)
55
 
 
56
 
                        patchdir.Patch(seq_path, diff.open("rb"))
57
 
                        #print "#########", seq_path, new_path
58
 
                        assert seq_path.compare_recursive(new_path, 1)
59
 
 
60
 
        def test_root(self):
61
 
                """Test changing uid/gid, devices"""
62
 
                self.deltmp()
63
 
                os.system("cp -a testfiles/root1 testfiles/output/sequence")
64
 
                seq_path = Path("testfiles/output/sequence")
65
 
                new_path = Path("testfiles/root2")
66
 
                sig = Path("testfiles/output/sig.tar")
67
 
                diff = Path("testfiles/output/diff.tar")
68
 
 
69
 
                diffdir.write_block_iter(diffdir.DirSig(self.get_sel(seq_path)), sig)
70
 
                deltablock = diffdir.DirDelta(self.get_sel(new_path), sig.open("rb"))
71
 
                diffdir.write_block_iter(deltablock, diff)
72
 
 
73
 
                patchdir.Patch(seq_path, diff.open("rb"))
74
 
 
75
 
                # since we are not running as root, don't even both comparing,
76
 
                # just make sure file5 exists and file4 doesn't.
77
 
                file5 = seq_path.append("file5")
78
 
                assert file5.isreg()
79
 
                file4 = seq_path.append("file4")
80
 
                assert file4.type is None
81
 
 
82
 
        def test_root2(self):
83
 
                """Again test files we don't have access to, this time Tar_WriteSig"""
84
 
                self.deltmp()
85
 
                sig_path = Path("testfiles/output/sig.sigtar")
86
 
                tar_path = Path("testfiles/output/tar.tar")
87
 
                basis_path = Path("testfiles/root1")
88
 
 
89
 
                deltablock = diffdir.DirFull_WriteSig(self.get_sel(basis_path),
90
 
                                                                                          sig_path.open("wb"))
91
 
                diffdir.write_block_iter(deltablock, tar_path)
92
 
                
93
 
        def test_block_tar(self):
94
 
                """Test building block tar from a number of files"""
95
 
                def get_fileobjs():
96
 
                        """Return iterator yielding open fileobjs of tar files"""
97
 
                        for i in range(1, 4):
98
 
                                p = Path("testfiles/blocktartest/test%d.tar" % i)
99
 
                                fp = p.open("rb")
100
 
                                yield fp
101
 
                                fp.close()
102
 
 
103
 
                tf = patchdir.TarFile_FromFileobjs(get_fileobjs())
104
 
                namelist = []
105
 
                for tarinfo in tf: namelist.append(tarinfo.name)
106
 
                for i in range(1, 6):
107
 
                        assert ("tmp/%d" % i) in namelist, namelist
108
 
 
109
 
        def test_doubledot_hole(self):
110
 
                """Test for the .. bug that lets tar overwrite parent dir"""
111
 
                self.deltmp()
112
 
 
113
 
                def make_bad_tar(filename):
114
 
                        """Write attack tarfile to filename"""
115
 
                        def iterate_one_pair(path):
116
 
                                """Iterate one (tarinfo, fp) pair
117
 
 
118
 
                                file object will be empty, and tarinfo will have path
119
 
                                "snapshot/../warning-security-error"
120
 
 
121
 
                                """
122
 
                                path.index = ("diff", "..", "warning-security-error")
123
 
                                ti = path.get_tarinfo()
124
 
                                fp = cStringIO.StringIO("")
125
 
                                yield (ti, fp)
126
 
                        assert not os.system("cat /dev/null >testfiles/output/file")
127
 
                        tf = tarfile.TarFromIterator(iterate_one_pair(
128
 
                                Path("testfiles/output/file")))
129
 
                        tfbuf = tf.read()
130
 
 
131
 
                        fout = open(filename, "wb")
132
 
                        fout.write(tfbuf)
133
 
                        assert not fout.close()
134
 
 
135
 
                self.deltmp()
136
 
                make_bad_tar("testfiles/output/bad.tar")
137
 
                os.mkdir("testfiles/output/temp")
138
 
 
139
 
                self.assertRaises(patchdir.PatchDirException, patchdir.Patch,
140
 
                                                  Path("testfiles/output/temp"),
141
 
                                                  open("testfiles/output/bad.tar"))
142
 
                assert not Path("testfiles/output/warning-security-error").exists()
 
10
    """Test patching"""
 
11
    def copyfileobj(self, infp, outfp):
 
12
        """Copy in fileobj to out, closing afterwards"""
 
13
        blocksize = 32 * 1024
 
14
        while 1:
 
15
            buf = infp.read(blocksize)
 
16
            if not buf: break
 
17
            outfp.write(buf)
 
18
        assert not infp.close()
 
19
        assert not outfp.close()
 
20
 
 
21
    def deltmp(self):
 
22
        """Delete temporary directories"""
 
23
        assert not os.system("rm -rf testfiles/output")
 
24
        os.mkdir("testfiles/output")
 
25
 
 
26
    def test_total(self):
 
27
        """Test cycle on dirx"""
 
28
        self.total_sequence(['testfiles/dir1',
 
29
                             'testfiles/dir2',
 
30
                             'testfiles/dir3'])
 
31
 
 
32
    def get_sel(self, path):
 
33
        """Get selection iter over the given directory"""
 
34
        return selection.Select(path).set_iter()
 
35
 
 
36
    def total_sequence(self, filelist):
 
37
        """Test signatures, diffing, and patching on directory list"""
 
38
        assert len(filelist) >= 2
 
39
        self.deltmp()
 
40
        sig = Path("testfiles/output/sig.tar")
 
41
        diff = Path("testfiles/output/diff.tar")
 
42
        seq_path = Path("testfiles/output/sequence")        
 
43
        new_path, old_path = None, None # set below in for loop
 
44
 
 
45
        # Write initial full backup to diff.tar
 
46
        for dirname in filelist:
 
47
            old_path, new_path = new_path, Path(dirname)
 
48
            if old_path:
 
49
                sigblock = diffdir.DirSig(self.get_sel(seq_path))
 
50
                diffdir.write_block_iter(sigblock, sig)
 
51
                deltablock = diffdir.DirDelta(self.get_sel(new_path),
 
52
                                              sig.open("rb"))
 
53
            else: deltablock = diffdir.DirFull(self.get_sel(new_path))
 
54
            diffdir.write_block_iter(deltablock, diff)
 
55
 
 
56
            patchdir.Patch(seq_path, diff.open("rb"))
 
57
            #print "#########", seq_path, new_path
 
58
            assert seq_path.compare_recursive(new_path, 1)
 
59
 
 
60
    def test_root(self):
 
61
        """Test changing uid/gid, devices"""
 
62
        self.deltmp()
 
63
        os.system("cp -a testfiles/root1 testfiles/output/sequence")
 
64
        seq_path = Path("testfiles/output/sequence")
 
65
        new_path = Path("testfiles/root2")
 
66
        sig = Path("testfiles/output/sig.tar")
 
67
        diff = Path("testfiles/output/diff.tar")
 
68
 
 
69
        diffdir.write_block_iter(diffdir.DirSig(self.get_sel(seq_path)), sig)
 
70
        deltablock = diffdir.DirDelta(self.get_sel(new_path), sig.open("rb"))
 
71
        diffdir.write_block_iter(deltablock, diff)
 
72
 
 
73
        patchdir.Patch(seq_path, diff.open("rb"))
 
74
 
 
75
        # since we are not running as root, don't even both comparing,
 
76
        # just make sure file5 exists and file4 doesn't.
 
77
        file5 = seq_path.append("file5")
 
78
        assert file5.isreg()
 
79
        file4 = seq_path.append("file4")
 
80
        assert file4.type is None
 
81
 
 
82
    def test_root2(self):
 
83
        """Again test files we don't have access to, this time Tar_WriteSig"""
 
84
        self.deltmp()
 
85
        sig_path = Path("testfiles/output/sig.sigtar")
 
86
        tar_path = Path("testfiles/output/tar.tar")
 
87
        basis_path = Path("testfiles/root1")
 
88
 
 
89
        deltablock = diffdir.DirFull_WriteSig(self.get_sel(basis_path),
 
90
                                              sig_path.open("wb"))
 
91
        diffdir.write_block_iter(deltablock, tar_path)
 
92
        
 
93
    def test_block_tar(self):
 
94
        """Test building block tar from a number of files"""
 
95
        def get_fileobjs():
 
96
            """Return iterator yielding open fileobjs of tar files"""
 
97
            for i in range(1, 4):
 
98
                p = Path("testfiles/blocktartest/test%d.tar" % i)
 
99
                fp = p.open("rb")
 
100
                yield fp
 
101
                fp.close()
 
102
 
 
103
        tf = patchdir.TarFile_FromFileobjs(get_fileobjs())
 
104
        namelist = []
 
105
        for tarinfo in tf: namelist.append(tarinfo.name)
 
106
        for i in range(1, 6):
 
107
            assert ("tmp/%d" % i) in namelist, namelist
 
108
 
 
109
    def test_doubledot_hole(self):
 
110
        """Test for the .. bug that lets tar overwrite parent dir"""
 
111
        self.deltmp()
 
112
 
 
113
        def make_bad_tar(filename):
 
114
            """Write attack tarfile to filename"""
 
115
            def iterate_one_pair(path):
 
116
                """Iterate one (tarinfo, fp) pair
 
117
 
 
118
                file object will be empty, and tarinfo will have path
 
119
                "snapshot/../warning-security-error"
 
120
 
 
121
                """
 
122
                path.index = ("diff", "..", "warning-security-error")
 
123
                ti = path.get_tarinfo()
 
124
                fp = cStringIO.StringIO("")
 
125
                yield (ti, fp)
 
126
            assert not os.system("cat /dev/null >testfiles/output/file")
 
127
            tf = tarfile.TarFromIterator(iterate_one_pair(
 
128
                Path("testfiles/output/file")))
 
129
            tfbuf = tf.read()
 
130
 
 
131
            fout = open(filename, "wb")
 
132
            fout.write(tfbuf)
 
133
            assert not fout.close()
 
134
 
 
135
        self.deltmp()
 
136
        make_bad_tar("testfiles/output/bad.tar")
 
137
        os.mkdir("testfiles/output/temp")
 
138
 
 
139
        self.assertRaises(patchdir.PatchDirException, patchdir.Patch,
 
140
                          Path("testfiles/output/temp"),
 
141
                          open("testfiles/output/bad.tar"))
 
142
        assert not Path("testfiles/output/warning-security-error").exists()
143
143
 
144
144
 
145
145
 
146
146
class index:
147
 
        """Used below to test the iter collation"""
148
 
        def __init__(self, index):
149
 
                self.index = index
 
147
    """Used below to test the iter collation"""
 
148
    def __init__(self, index):
 
149
        self.index = index
150
150
 
151
151
class CollateItersTest(unittest.TestCase):
152
 
        def test_collate(self):
153
 
                """Test collate_iters function"""
154
 
                indicies = map(index, [0,1,2,3])
155
 
                helper = lambda i: indicies[i]
156
 
 
157
 
                makeiter1 = lambda: iter(indicies)
158
 
                makeiter2 = lambda: iter(map(helper, [0,1,3]))
159
 
                makeiter3 = lambda: iter(map(helper, [1,2]))
160
 
 
161
 
                outiter = patchdir.collate_iters([makeiter1(), makeiter2()])
162
 
                assert Iter.equal(outiter,
163
 
                                                  iter([(indicies[0], indicies[0]),
164
 
                                                                (indicies[1], indicies[1]),
165
 
                                                                (indicies[2], None),
166
 
                                                                (indicies[3], indicies[3])]))
167
 
 
168
 
                assert Iter.equal(patchdir.collate_iters([makeiter1(),
169
 
                                                                                                 makeiter2(),
170
 
                                                                                                 makeiter3()]),
171
 
                                                  iter([(indicies[0], indicies[0], None),
172
 
                                                                (indicies[1], indicies[1], indicies[1]),
173
 
                                                                (indicies[2], None, indicies[2]),
174
 
                                                                (indicies[3], indicies[3], None)]), 1)
175
 
 
176
 
                assert Iter.equal(patchdir.collate_iters([makeiter1(), iter([])]),
177
 
                                                  iter(map(lambda i: (i, None), indicies)))
178
 
                assert Iter.equal(iter(map(lambda i: (i, None), indicies)),
179
 
                                                  patchdir.collate_iters([makeiter1(), iter([])]))
180
 
 
181
 
        def test_tuple(self):
182
 
                """Test indexed tuple"""
183
 
                i = patchdir.IndexedTuple((1,2,3), ("a", "b"))
184
 
                i2 = patchdir.IndexedTuple((), ("hello", "there", "how are you"))
185
 
 
186
 
                assert i[0] == "a"
187
 
                assert i[1] == "b"
188
 
                assert i2[1] == "there"
189
 
                assert len(i) == 2 and len(i2) == 3
190
 
                assert i2 < i, i2 < i
191
 
 
192
 
        def test_tuple_assignment(self):
193
 
                a, b, c = patchdir.IndexedTuple((), (1, 2, 3))
194
 
                assert a == 1
195
 
                assert b == 2
196
 
                assert c == 3
 
152
    def test_collate(self):
 
153
        """Test collate_iters function"""
 
154
        indicies = map(index, [0,1,2,3])
 
155
        helper = lambda i: indicies[i]
 
156
 
 
157
        makeiter1 = lambda: iter(indicies)
 
158
        makeiter2 = lambda: iter(map(helper, [0,1,3]))
 
159
        makeiter3 = lambda: iter(map(helper, [1,2]))
 
160
 
 
161
        outiter = patchdir.collate_iters([makeiter1(), makeiter2()])
 
162
        assert Iter.equal(outiter,
 
163
                          iter([(indicies[0], indicies[0]),
 
164
                                (indicies[1], indicies[1]),
 
165
                                (indicies[2], None),
 
166
                                (indicies[3], indicies[3])]))
 
167
 
 
168
        assert Iter.equal(patchdir.collate_iters([makeiter1(),
 
169
                                                 makeiter2(),
 
170
                                                 makeiter3()]),
 
171
                          iter([(indicies[0], indicies[0], None),
 
172
                                (indicies[1], indicies[1], indicies[1]),
 
173
                                (indicies[2], None, indicies[2]),
 
174
                                (indicies[3], indicies[3], None)]), 1)
 
175
 
 
176
        assert Iter.equal(patchdir.collate_iters([makeiter1(), iter([])]),
 
177
                          iter(map(lambda i: (i, None), indicies)))
 
178
        assert Iter.equal(iter(map(lambda i: (i, None), indicies)),
 
179
                          patchdir.collate_iters([makeiter1(), iter([])]))
 
180
 
 
181
    def test_tuple(self):
 
182
        """Test indexed tuple"""
 
183
        i = patchdir.IndexedTuple((1,2,3), ("a", "b"))
 
184
        i2 = patchdir.IndexedTuple((), ("hello", "there", "how are you"))
 
185
 
 
186
        assert i[0] == "a"
 
187
        assert i[1] == "b"
 
188
        assert i2[1] == "there"
 
189
        assert len(i) == 2 and len(i2) == 3
 
190
        assert i2 < i, i2 < i
 
191
 
 
192
    def test_tuple_assignment(self):
 
193
        a, b, c = patchdir.IndexedTuple((), (1, 2, 3))
 
194
        assert a == 1
 
195
        assert b == 2
 
196
        assert c == 3
197
197
 
198
198
 
199
199
class TestInnerFuncs(unittest.TestCase):
200
 
        """Test some other functions involved in patching"""
201
 
        def setUp(self):
202
 
                self.check_output()
203
 
 
204
 
        def check_output(self):
205
 
                """Make sure testfiles/output exists"""
206
 
                out = Path("testfiles/output")
207
 
                if not out.exists() and out.isdir(): out.mkdir()
208
 
                self.out = out
209
 
 
210
 
        def snapshot(self):
211
 
                """Make a snapshot ROPath, permissions 0600"""
212
 
                ss = self.out.append("snapshot")
213
 
                fout = ss.open("wb")
214
 
                fout.write("hello, world!")
215
 
                assert not fout.close()
216
 
                ss.chmod(0600)
217
 
                ss.difftype = "snapshot"
218
 
                return ss
219
 
        
220
 
        def get_delta(self, old_buf, new_buf):
221
 
                """Return delta buffer from old to new"""
222
 
                sigfile = librsync.SigFile(cStringIO.StringIO(old_buf))
223
 
                sig = sigfile.read()
224
 
                assert not sigfile.close()
225
 
 
226
 
                deltafile = librsync.DeltaFile(sig, cStringIO.StringIO(new_buf))
227
 
                deltabuf = deltafile.read()
228
 
                assert not deltafile.close()
229
 
                return deltabuf
230
 
 
231
 
        def delta1(self):
232
 
                """Make a delta ROPath, permissions 0640"""
233
 
                delta1 = self.out.append("delta1")
234
 
                fout = delta1.open("wb")
235
 
                fout.write(self.get_delta("hello, world!",
236
 
                                                                  "aonseuth aosetnuhaonsuhtansoetuhaoe"))
237
 
                assert not fout.close()
238
 
                delta1.chmod(0640)
239
 
                delta1.difftype = "diff"
240
 
                return delta1
241
 
 
242
 
        def delta2(self):
243
 
                """Make another delta ROPath, permissions 0644"""
244
 
                delta2 = self.out.append("delta1")
245
 
                fout = delta2.open("wb")
246
 
                fout.write(self.get_delta("aonseuth aosetnuhaonsuhtansoetuhaoe",
247
 
                                                                  "3499 34957839485792357 458348573"))
248
 
                assert not fout.close()
249
 
                delta2.chmod(0644)
250
 
                delta2.difftype = "diff"
251
 
                return delta2
252
 
 
253
 
        def deleted(self):
254
 
                """Make a deleted ROPath"""
255
 
                deleted = self.out.append("deleted")
256
 
                assert not deleted.exists()
257
 
                deleted.difftype = "deleted"
258
 
                return deleted
259
 
 
260
 
        def test_normalize(self):
261
 
                """Test normalizing a sequence of diffs"""
262
 
                ss = self.snapshot()
263
 
                d1 = self.delta1()
264
 
                d2 = self.delta2()
265
 
                de = self.deleted()
266
 
 
267
 
                seq1 = [ss, d1, d2]
268
 
                seq2 = [ss, d1, d2, de]
269
 
                seq3 = [de, ss, d1, d2]
270
 
                seq4 = [de, ss, d1, d2, ss]
271
 
                seq5 = [de, ss, d1, d2, ss, d1, d2]
272
 
                
273
 
                def try_seq(input_seq, correct_output_seq):
274
 
                        normed = patchdir.normalize_ps(input_seq)
275
 
                        assert normed == correct_output_seq, (normed, correct_output_eq)
276
 
 
277
 
                try_seq(seq1, seq1)
278
 
                try_seq(seq2, [de])
279
 
                try_seq(seq3, seq1)
280
 
                try_seq(seq4, [ss])
281
 
                try_seq(seq5, seq1)
282
 
 
283
 
        def test_patch_seq2ropath(self):
284
 
                """Test patching sequence"""
285
 
                def testseq(seq, perms, buf):
286
 
                        result = patchdir.patch_seq2ropath(seq)
287
 
                        assert result.getperms() == perms, (result.getperms(), perms)
288
 
                        fout = result.open("rb")
289
 
                        contents = fout.read()
290
 
                        assert not fout.close()
291
 
                        assert contents == buf, (contents, buf)
292
 
 
293
 
                testseq([self.snapshot()], 0600, "hello, world!")
294
 
                testseq([self.snapshot(), self.delta1()], 0640,
295
 
                                "aonseuth aosetnuhaonsuhtansoetuhaoe")
296
 
                testseq([self.snapshot(), self.delta1(), self.delta2()], 0644,
297
 
                                "3499 34957839485792357 458348573")
 
200
    """Test some other functions involved in patching"""
 
201
    def setUp(self):
 
202
        self.check_output()
 
203
 
 
204
    def check_output(self):
 
205
        """Make sure testfiles/output exists"""
 
206
        out = Path("testfiles/output")
 
207
        if not out.exists() and out.isdir(): out.mkdir()
 
208
        self.out = out
 
209
 
 
210
    def snapshot(self):
 
211
        """Make a snapshot ROPath, permissions 0600"""
 
212
        ss = self.out.append("snapshot")
 
213
        fout = ss.open("wb")
 
214
        fout.write("hello, world!")
 
215
        assert not fout.close()
 
216
        ss.chmod(0600)
 
217
        ss.difftype = "snapshot"
 
218
        return ss
 
219
    
 
220
    def get_delta(self, old_buf, new_buf):
 
221
        """Return delta buffer from old to new"""
 
222
        sigfile = librsync.SigFile(cStringIO.StringIO(old_buf))
 
223
        sig = sigfile.read()
 
224
        assert not sigfile.close()
 
225
 
 
226
        deltafile = librsync.DeltaFile(sig, cStringIO.StringIO(new_buf))
 
227
        deltabuf = deltafile.read()
 
228
        assert not deltafile.close()
 
229
        return deltabuf
 
230
 
 
231
    def delta1(self):
 
232
        """Make a delta ROPath, permissions 0640"""
 
233
        delta1 = self.out.append("delta1")
 
234
        fout = delta1.open("wb")
 
235
        fout.write(self.get_delta("hello, world!",
 
236
                                  "aonseuth aosetnuhaonsuhtansoetuhaoe"))
 
237
        assert not fout.close()
 
238
        delta1.chmod(0640)
 
239
        delta1.difftype = "diff"
 
240
        return delta1
 
241
 
 
242
    def delta2(self):
 
243
        """Make another delta ROPath, permissions 0644"""
 
244
        delta2 = self.out.append("delta1")
 
245
        fout = delta2.open("wb")
 
246
        fout.write(self.get_delta("aonseuth aosetnuhaonsuhtansoetuhaoe",
 
247
                                  "3499 34957839485792357 458348573"))
 
248
        assert not fout.close()
 
249
        delta2.chmod(0644)
 
250
        delta2.difftype = "diff"
 
251
        return delta2
 
252
 
 
253
    def deleted(self):
 
254
        """Make a deleted ROPath"""
 
255
        deleted = self.out.append("deleted")
 
256
        assert not deleted.exists()
 
257
        deleted.difftype = "deleted"
 
258
        return deleted
 
259
 
 
260
    def test_normalize(self):
 
261
        """Test normalizing a sequence of diffs"""
 
262
        ss = self.snapshot()
 
263
        d1 = self.delta1()
 
264
        d2 = self.delta2()
 
265
        de = self.deleted()
 
266
 
 
267
        seq1 = [ss, d1, d2]
 
268
        seq2 = [ss, d1, d2, de]
 
269
        seq3 = [de, ss, d1, d2]
 
270
        seq4 = [de, ss, d1, d2, ss]
 
271
        seq5 = [de, ss, d1, d2, ss, d1, d2]
 
272
        
 
273
        def try_seq(input_seq, correct_output_seq):
 
274
            normed = patchdir.normalize_ps(input_seq)
 
275
            assert normed == correct_output_seq, (normed, correct_output_eq)
 
276
 
 
277
        try_seq(seq1, seq1)
 
278
        try_seq(seq2, [de])
 
279
        try_seq(seq3, seq1)
 
280
        try_seq(seq4, [ss])
 
281
        try_seq(seq5, seq1)
 
282
 
 
283
    def test_patch_seq2ropath(self):
 
284
        """Test patching sequence"""
 
285
        def testseq(seq, perms, buf):
 
286
            result = patchdir.patch_seq2ropath(seq)
 
287
            assert result.getperms() == perms, (result.getperms(), perms)
 
288
            fout = result.open("rb")
 
289
            contents = fout.read()
 
290
            assert not fout.close()
 
291
            assert contents == buf, (contents, buf)
 
292
 
 
293
        testseq([self.snapshot()], 0600, "hello, world!")
 
294
        testseq([self.snapshot(), self.delta1()], 0640,
 
295
                "aonseuth aosetnuhaonsuhtansoetuhaoe")
 
296
        testseq([self.snapshot(), self.delta1(), self.delta2()], 0644,
 
297
                "3499 34957839485792357 458348573")
298
298
 
299
299
 
300
300
if __name__ == "__main__":
301
 
        unittest.main()
 
301
    unittest.main()