9
9
class PatchingTest(unittest.TestCase):
11
def copyfileobj(self, infp, outfp):
12
"""Copy in fileobj to out, closing afterwards"""
15
buf = infp.read(blocksize)
18
assert not infp.close()
19
assert not outfp.close()
22
"""Delete temporary directories"""
23
assert not os.system("rm -rf testfiles/output")
24
os.mkdir("testfiles/output")
27
"""Test cycle on dirx"""
28
self.total_sequence(['testfiles/dir1',
32
def get_sel(self, path):
33
"""Get selection iter over the given directory"""
34
return selection.Select(path).set_iter()
36
def total_sequence(self, filelist):
37
"""Test signatures, diffing, and patching on directory list"""
38
assert len(filelist) >= 2
40
sig = Path("testfiles/output/sig.tar")
41
diff = Path("testfiles/output/diff.tar")
42
seq_path = Path("testfiles/output/sequence")
43
new_path, old_path = None, None # set below in for loop
45
# Write initial full backup to diff.tar
46
for dirname in filelist:
47
old_path, new_path = new_path, Path(dirname)
49
sigblock = diffdir.DirSig(self.get_sel(seq_path))
50
diffdir.write_block_iter(sigblock, sig)
51
deltablock = diffdir.DirDelta(self.get_sel(new_path),
53
else: deltablock = diffdir.DirFull(self.get_sel(new_path))
54
diffdir.write_block_iter(deltablock, diff)
56
patchdir.Patch(seq_path, diff.open("rb"))
57
#print "#########", seq_path, new_path
58
assert seq_path.compare_recursive(new_path, 1)
61
"""Test changing uid/gid, devices"""
63
os.system("cp -a testfiles/root1 testfiles/output/sequence")
64
seq_path = Path("testfiles/output/sequence")
65
new_path = Path("testfiles/root2")
66
sig = Path("testfiles/output/sig.tar")
67
diff = Path("testfiles/output/diff.tar")
69
diffdir.write_block_iter(diffdir.DirSig(self.get_sel(seq_path)), sig)
70
deltablock = diffdir.DirDelta(self.get_sel(new_path), sig.open("rb"))
71
diffdir.write_block_iter(deltablock, diff)
73
patchdir.Patch(seq_path, diff.open("rb"))
75
# since we are not running as root, don't even both comparing,
76
# just make sure file5 exists and file4 doesn't.
77
file5 = seq_path.append("file5")
79
file4 = seq_path.append("file4")
80
assert file4.type is None
83
"""Again test files we don't have access to, this time Tar_WriteSig"""
85
sig_path = Path("testfiles/output/sig.sigtar")
86
tar_path = Path("testfiles/output/tar.tar")
87
basis_path = Path("testfiles/root1")
89
deltablock = diffdir.DirFull_WriteSig(self.get_sel(basis_path),
91
diffdir.write_block_iter(deltablock, tar_path)
93
def test_block_tar(self):
94
"""Test building block tar from a number of files"""
96
"""Return iterator yielding open fileobjs of tar files"""
98
p = Path("testfiles/blocktartest/test%d.tar" % i)
103
tf = patchdir.TarFile_FromFileobjs(get_fileobjs())
105
for tarinfo in tf: namelist.append(tarinfo.name)
106
for i in range(1, 6):
107
assert ("tmp/%d" % i) in namelist, namelist
109
def test_doubledot_hole(self):
110
"""Test for the .. bug that lets tar overwrite parent dir"""
113
def make_bad_tar(filename):
114
"""Write attack tarfile to filename"""
115
def iterate_one_pair(path):
116
"""Iterate one (tarinfo, fp) pair
118
file object will be empty, and tarinfo will have path
119
"snapshot/../warning-security-error"
122
path.index = ("diff", "..", "warning-security-error")
123
ti = path.get_tarinfo()
124
fp = cStringIO.StringIO("")
126
assert not os.system("cat /dev/null >testfiles/output/file")
127
tf = tarfile.TarFromIterator(iterate_one_pair(
128
Path("testfiles/output/file")))
131
fout = open(filename, "wb")
133
assert not fout.close()
136
make_bad_tar("testfiles/output/bad.tar")
137
os.mkdir("testfiles/output/temp")
139
self.assertRaises(patchdir.PatchDirException, patchdir.Patch,
140
Path("testfiles/output/temp"),
141
open("testfiles/output/bad.tar"))
142
assert not Path("testfiles/output/warning-security-error").exists()
11
def copyfileobj(self, infp, outfp):
12
"""Copy in fileobj to out, closing afterwards"""
15
buf = infp.read(blocksize)
18
assert not infp.close()
19
assert not outfp.close()
22
"""Delete temporary directories"""
23
assert not os.system("rm -rf testfiles/output")
24
os.mkdir("testfiles/output")
27
"""Test cycle on dirx"""
28
self.total_sequence(['testfiles/dir1',
32
def get_sel(self, path):
33
"""Get selection iter over the given directory"""
34
return selection.Select(path).set_iter()
36
def total_sequence(self, filelist):
37
"""Test signatures, diffing, and patching on directory list"""
38
assert len(filelist) >= 2
40
sig = Path("testfiles/output/sig.tar")
41
diff = Path("testfiles/output/diff.tar")
42
seq_path = Path("testfiles/output/sequence")
43
new_path, old_path = None, None # set below in for loop
45
# Write initial full backup to diff.tar
46
for dirname in filelist:
47
old_path, new_path = new_path, Path(dirname)
49
sigblock = diffdir.DirSig(self.get_sel(seq_path))
50
diffdir.write_block_iter(sigblock, sig)
51
deltablock = diffdir.DirDelta(self.get_sel(new_path),
53
else: deltablock = diffdir.DirFull(self.get_sel(new_path))
54
diffdir.write_block_iter(deltablock, diff)
56
patchdir.Patch(seq_path, diff.open("rb"))
57
#print "#########", seq_path, new_path
58
assert seq_path.compare_recursive(new_path, 1)
61
"""Test changing uid/gid, devices"""
63
os.system("cp -a testfiles/root1 testfiles/output/sequence")
64
seq_path = Path("testfiles/output/sequence")
65
new_path = Path("testfiles/root2")
66
sig = Path("testfiles/output/sig.tar")
67
diff = Path("testfiles/output/diff.tar")
69
diffdir.write_block_iter(diffdir.DirSig(self.get_sel(seq_path)), sig)
70
deltablock = diffdir.DirDelta(self.get_sel(new_path), sig.open("rb"))
71
diffdir.write_block_iter(deltablock, diff)
73
patchdir.Patch(seq_path, diff.open("rb"))
75
# since we are not running as root, don't even both comparing,
76
# just make sure file5 exists and file4 doesn't.
77
file5 = seq_path.append("file5")
79
file4 = seq_path.append("file4")
80
assert file4.type is None
83
"""Again test files we don't have access to, this time Tar_WriteSig"""
85
sig_path = Path("testfiles/output/sig.sigtar")
86
tar_path = Path("testfiles/output/tar.tar")
87
basis_path = Path("testfiles/root1")
89
deltablock = diffdir.DirFull_WriteSig(self.get_sel(basis_path),
91
diffdir.write_block_iter(deltablock, tar_path)
93
def test_block_tar(self):
94
"""Test building block tar from a number of files"""
96
"""Return iterator yielding open fileobjs of tar files"""
98
p = Path("testfiles/blocktartest/test%d.tar" % i)
103
tf = patchdir.TarFile_FromFileobjs(get_fileobjs())
105
for tarinfo in tf: namelist.append(tarinfo.name)
106
for i in range(1, 6):
107
assert ("tmp/%d" % i) in namelist, namelist
109
def test_doubledot_hole(self):
110
"""Test for the .. bug that lets tar overwrite parent dir"""
113
def make_bad_tar(filename):
114
"""Write attack tarfile to filename"""
115
def iterate_one_pair(path):
116
"""Iterate one (tarinfo, fp) pair
118
file object will be empty, and tarinfo will have path
119
"snapshot/../warning-security-error"
122
path.index = ("diff", "..", "warning-security-error")
123
ti = path.get_tarinfo()
124
fp = cStringIO.StringIO("")
126
assert not os.system("cat /dev/null >testfiles/output/file")
127
tf = tarfile.TarFromIterator(iterate_one_pair(
128
Path("testfiles/output/file")))
131
fout = open(filename, "wb")
133
assert not fout.close()
136
make_bad_tar("testfiles/output/bad.tar")
137
os.mkdir("testfiles/output/temp")
139
self.assertRaises(patchdir.PatchDirException, patchdir.Patch,
140
Path("testfiles/output/temp"),
141
open("testfiles/output/bad.tar"))
142
assert not Path("testfiles/output/warning-security-error").exists()
147
"""Used below to test the iter collation"""
148
def __init__(self, index):
147
"""Used below to test the iter collation"""
148
def __init__(self, index):
151
151
class CollateItersTest(unittest.TestCase):
152
def test_collate(self):
153
"""Test collate_iters function"""
154
indicies = map(index, [0,1,2,3])
155
helper = lambda i: indicies[i]
157
makeiter1 = lambda: iter(indicies)
158
makeiter2 = lambda: iter(map(helper, [0,1,3]))
159
makeiter3 = lambda: iter(map(helper, [1,2]))
161
outiter = patchdir.collate_iters([makeiter1(), makeiter2()])
162
assert Iter.equal(outiter,
163
iter([(indicies[0], indicies[0]),
164
(indicies[1], indicies[1]),
166
(indicies[3], indicies[3])]))
168
assert Iter.equal(patchdir.collate_iters([makeiter1(),
171
iter([(indicies[0], indicies[0], None),
172
(indicies[1], indicies[1], indicies[1]),
173
(indicies[2], None, indicies[2]),
174
(indicies[3], indicies[3], None)]), 1)
176
assert Iter.equal(patchdir.collate_iters([makeiter1(), iter([])]),
177
iter(map(lambda i: (i, None), indicies)))
178
assert Iter.equal(iter(map(lambda i: (i, None), indicies)),
179
patchdir.collate_iters([makeiter1(), iter([])]))
181
def test_tuple(self):
182
"""Test indexed tuple"""
183
i = patchdir.IndexedTuple((1,2,3), ("a", "b"))
184
i2 = patchdir.IndexedTuple((), ("hello", "there", "how are you"))
188
assert i2[1] == "there"
189
assert len(i) == 2 and len(i2) == 3
190
assert i2 < i, i2 < i
192
def test_tuple_assignment(self):
193
a, b, c = patchdir.IndexedTuple((), (1, 2, 3))
152
def test_collate(self):
153
"""Test collate_iters function"""
154
indicies = map(index, [0,1,2,3])
155
helper = lambda i: indicies[i]
157
makeiter1 = lambda: iter(indicies)
158
makeiter2 = lambda: iter(map(helper, [0,1,3]))
159
makeiter3 = lambda: iter(map(helper, [1,2]))
161
outiter = patchdir.collate_iters([makeiter1(), makeiter2()])
162
assert Iter.equal(outiter,
163
iter([(indicies[0], indicies[0]),
164
(indicies[1], indicies[1]),
166
(indicies[3], indicies[3])]))
168
assert Iter.equal(patchdir.collate_iters([makeiter1(),
171
iter([(indicies[0], indicies[0], None),
172
(indicies[1], indicies[1], indicies[1]),
173
(indicies[2], None, indicies[2]),
174
(indicies[3], indicies[3], None)]), 1)
176
assert Iter.equal(patchdir.collate_iters([makeiter1(), iter([])]),
177
iter(map(lambda i: (i, None), indicies)))
178
assert Iter.equal(iter(map(lambda i: (i, None), indicies)),
179
patchdir.collate_iters([makeiter1(), iter([])]))
181
def test_tuple(self):
182
"""Test indexed tuple"""
183
i = patchdir.IndexedTuple((1,2,3), ("a", "b"))
184
i2 = patchdir.IndexedTuple((), ("hello", "there", "how are you"))
188
assert i2[1] == "there"
189
assert len(i) == 2 and len(i2) == 3
190
assert i2 < i, i2 < i
192
def test_tuple_assignment(self):
193
a, b, c = patchdir.IndexedTuple((), (1, 2, 3))
199
199
class TestInnerFuncs(unittest.TestCase):
200
"""Test some other functions involved in patching"""
204
def check_output(self):
205
"""Make sure testfiles/output exists"""
206
out = Path("testfiles/output")
207
if not out.exists() and out.isdir(): out.mkdir()
211
"""Make a snapshot ROPath, permissions 0600"""
212
ss = self.out.append("snapshot")
214
fout.write("hello, world!")
215
assert not fout.close()
217
ss.difftype = "snapshot"
220
def get_delta(self, old_buf, new_buf):
221
"""Return delta buffer from old to new"""
222
sigfile = librsync.SigFile(cStringIO.StringIO(old_buf))
224
assert not sigfile.close()
226
deltafile = librsync.DeltaFile(sig, cStringIO.StringIO(new_buf))
227
deltabuf = deltafile.read()
228
assert not deltafile.close()
232
"""Make a delta ROPath, permissions 0640"""
233
delta1 = self.out.append("delta1")
234
fout = delta1.open("wb")
235
fout.write(self.get_delta("hello, world!",
236
"aonseuth aosetnuhaonsuhtansoetuhaoe"))
237
assert not fout.close()
239
delta1.difftype = "diff"
243
"""Make another delta ROPath, permissions 0644"""
244
delta2 = self.out.append("delta1")
245
fout = delta2.open("wb")
246
fout.write(self.get_delta("aonseuth aosetnuhaonsuhtansoetuhaoe",
247
"3499 34957839485792357 458348573"))
248
assert not fout.close()
250
delta2.difftype = "diff"
254
"""Make a deleted ROPath"""
255
deleted = self.out.append("deleted")
256
assert not deleted.exists()
257
deleted.difftype = "deleted"
260
def test_normalize(self):
261
"""Test normalizing a sequence of diffs"""
268
seq2 = [ss, d1, d2, de]
269
seq3 = [de, ss, d1, d2]
270
seq4 = [de, ss, d1, d2, ss]
271
seq5 = [de, ss, d1, d2, ss, d1, d2]
273
def try_seq(input_seq, correct_output_seq):
274
normed = patchdir.normalize_ps(input_seq)
275
assert normed == correct_output_seq, (normed, correct_output_eq)
283
def test_patch_seq2ropath(self):
284
"""Test patching sequence"""
285
def testseq(seq, perms, buf):
286
result = patchdir.patch_seq2ropath(seq)
287
assert result.getperms() == perms, (result.getperms(), perms)
288
fout = result.open("rb")
289
contents = fout.read()
290
assert not fout.close()
291
assert contents == buf, (contents, buf)
293
testseq([self.snapshot()], 0600, "hello, world!")
294
testseq([self.snapshot(), self.delta1()], 0640,
295
"aonseuth aosetnuhaonsuhtansoetuhaoe")
296
testseq([self.snapshot(), self.delta1(), self.delta2()], 0644,
297
"3499 34957839485792357 458348573")
200
"""Test some other functions involved in patching"""
204
def check_output(self):
205
"""Make sure testfiles/output exists"""
206
out = Path("testfiles/output")
207
if not out.exists() and out.isdir(): out.mkdir()
211
"""Make a snapshot ROPath, permissions 0600"""
212
ss = self.out.append("snapshot")
214
fout.write("hello, world!")
215
assert not fout.close()
217
ss.difftype = "snapshot"
220
def get_delta(self, old_buf, new_buf):
221
"""Return delta buffer from old to new"""
222
sigfile = librsync.SigFile(cStringIO.StringIO(old_buf))
224
assert not sigfile.close()
226
deltafile = librsync.DeltaFile(sig, cStringIO.StringIO(new_buf))
227
deltabuf = deltafile.read()
228
assert not deltafile.close()
232
"""Make a delta ROPath, permissions 0640"""
233
delta1 = self.out.append("delta1")
234
fout = delta1.open("wb")
235
fout.write(self.get_delta("hello, world!",
236
"aonseuth aosetnuhaonsuhtansoetuhaoe"))
237
assert not fout.close()
239
delta1.difftype = "diff"
243
"""Make another delta ROPath, permissions 0644"""
244
delta2 = self.out.append("delta1")
245
fout = delta2.open("wb")
246
fout.write(self.get_delta("aonseuth aosetnuhaonsuhtansoetuhaoe",
247
"3499 34957839485792357 458348573"))
248
assert not fout.close()
250
delta2.difftype = "diff"
254
"""Make a deleted ROPath"""
255
deleted = self.out.append("deleted")
256
assert not deleted.exists()
257
deleted.difftype = "deleted"
260
def test_normalize(self):
261
"""Test normalizing a sequence of diffs"""
268
seq2 = [ss, d1, d2, de]
269
seq3 = [de, ss, d1, d2]
270
seq4 = [de, ss, d1, d2, ss]
271
seq5 = [de, ss, d1, d2, ss, d1, d2]
273
def try_seq(input_seq, correct_output_seq):
274
normed = patchdir.normalize_ps(input_seq)
275
assert normed == correct_output_seq, (normed, correct_output_eq)
283
def test_patch_seq2ropath(self):
284
"""Test patching sequence"""
285
def testseq(seq, perms, buf):
286
result = patchdir.patch_seq2ropath(seq)
287
assert result.getperms() == perms, (result.getperms(), perms)
288
fout = result.open("rb")
289
contents = fout.read()
290
assert not fout.close()
291
assert contents == buf, (contents, buf)
293
testseq([self.snapshot()], 0600, "hello, world!")
294
testseq([self.snapshot(), self.delta1()], 0640,
295
"aonseuth aosetnuhaonsuhtansoetuhaoe")
296
testseq([self.snapshot(), self.delta1(), self.delta2()], 0644,
297
"3499 34957839485792357 458348573")
300
300
if __name__ == "__main__":