~moritzm/duplicity/duplicity

« back to all changes in this revision

Viewing changes to duplicity/diffdir.py

  • Committer: bescoto
  • Date: 2002-10-29 01:49:46 UTC
  • Revision ID: vcs-imports@canonical.com-20021029014946-3m4rmm5plom7pl6q
Initial checkin

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright 2002 Ben Escoto
 
2
#
 
3
# This file is part of duplicity.
 
4
#
 
5
# duplicity is free software; you can redistribute it and/or modify it
 
6
# under the terms of the GNU General Public License as published by
 
7
# the Free Software Foundation, Inc., 675 Mass Ave, Cambridge MA
 
8
# 02139, USA; either version 2 of the License, or (at your option) any
 
9
# later version; incorporated herein by reference.
 
10
 
 
11
"""Functions for producing signatures and deltas of directories
 
12
 
 
13
Note that the main processes of this module have two parts.  In the
 
14
first, the signature or delta is constructed of a ROPath iterator.  In
 
15
the second, the ROPath iterator is put into tar block form.
 
16
 
 
17
"""
 
18
 
 
19
from __future__ import generators
 
20
import cStringIO, re, types
 
21
import tarfile, librsync, log
 
22
from path import *
 
23
from lazy import *
 
24
 
 
25
# Deltas will be broken up into volume_size chunks, so that an entire
 
26
# delta doesn't have to be read into memory.
 
27
volume_size = 1024 * 1024
 
28
 
 
29
class DiffDirException(Exception): pass
 
30
 
 
31
 
 
32
def DirSig(path_iter):
 
33
        """Alias for SigTarBlockIter below"""
 
34
        return SigTarBlockIter(path_iter)
 
35
 
 
36
 
 
37
def DirFull(path_iter):
 
38
        """Return a tarblock full backup of items in path_iter
 
39
 
 
40
        A full backup is just a diff starting from nothing (it may be less
 
41
        elegant than using a standard tar file, but we can be sure that it
 
42
        will be easy to split up the tar and make the volumes the same
 
43
        sizes).
 
44
 
 
45
        """
 
46
        return DirDelta(path_iter, cStringIO.StringIO(""))
 
47
 
 
48
def DirFull_WriteSig(path_iter, sig_outfp):
 
49
        """Return full backup like above, but also write signature to sig_outfp"""
 
50
        return DirDelta_WriteSig(path_iter, cStringIO.StringIO(""), sig_outfp)
 
51
 
 
52
 
 
53
def DirDelta(path_iter, dirsig_fileobj_list):
 
54
        """Produce tarblock diff given dirsig_fileobj_list and pathiter
 
55
 
 
56
        dirsig_fileobj_list should either be a tar fileobj or a list of
 
57
        those, sorted so the most recent is last.
 
58
 
 
59
        """
 
60
        if type(dirsig_fileobj_list) is types.ListType:
 
61
                sig_iter = combine_path_iters(map(sigtar2path_iter,
 
62
                                                                                  dirsig_fileobj_list))
 
63
        else: sig_iter = sigtar2path_iter(dirsig_fileobj_list)
 
64
        return DeltaTarBlockIter(get_delta_iter(path_iter, sig_iter))
 
65
 
 
66
def delta_iter_error_handler(exc, new_path, sig_path, sig_tar = None):
 
67
        """Called by get_delta_iter, report error in getting delta"""
 
68
        if new_path: index_string = new_path.get_relative_path()
 
69
        elif sig_path: index_string = sig_path.get_relative_path()
 
70
        else: assert 0, "Both new and sig are None for some reason"
 
71
        log.Log("Error %s getting delta for %s" % (str(exc), index_string), 2)
 
72
        return None
 
73
 
 
74
def get_delta_path(new_path, sig_path):
 
75
        """Get one delta_path, or None if error"""
 
76
        delta_path = new_path.get_rorpath()
 
77
        if not new_path.isreg():
 
78
                delta_path.difftype = "snapshot"
 
79
        elif not sig_path or not sig_path.isreg():
 
80
                delta_path.difftype = "snapshot"
 
81
                delta_path.setfileobj(new_path.open("rb"))
 
82
        else: # both new and sig exist and are regular files
 
83
                assert sig_path.difftype == "signature"
 
84
                delta_path.difftype = "diff"
 
85
                sigfp, newfp = sig_path.open("rb"), new_path.open("rb")
 
86
                delta_path.setfileobj(librsync.DeltaFile(sigfp, newfp))
 
87
        new_path.copy_attribs(delta_path)
 
88
        delta_path.stat.st_size = new_path.stat.st_size         
 
89
        return delta_path
 
90
 
 
91
def log_delta_path(delta_path):
 
92
        """Look at delta path and log delta"""
 
93
        if delta_path.difftype == "snapshot":
 
94
                log.Log("Generating delta - new file: %s" %
 
95
                                (delta_path.get_relative_path(),), 5)
 
96
        else: log.Log("Generating delta - changed file: %s" %
 
97
                                  (delta_path.get_relative_path(),), 5)
 
98
 
 
99
def get_delta_iter(new_iter, sig_iter):
 
100
        """Generate delta iter from new Path iter and sig Path iter.
 
101
 
 
102
        For each delta path of regular file type, path.difftype with be
 
103
        set to "snapshot", "diff".  sig_iter will probably iterate ROPaths
 
104
        instead of Paths.
 
105
 
 
106
        """
 
107
        collated = collate_iters(new_iter, sig_iter)
 
108
        for new_path, sig_path in collated:
 
109
                log.Log("Comparing %s and %s" % (new_path and new_path.index,
 
110
                                                                                 sig_path and sig_path.index), 6)
 
111
                if (not new_path or not new_path.type) and sig_path and sig_path.type:
 
112
                        log.Log("Generating delta - deleted file: %s" %
 
113
                                        (sig_path.get_relative_path(),), 5)
 
114
                        yield ROPath(sig_path.index)
 
115
                elif sig_path and new_path == sig_path: pass # no change, skip
 
116
                else:
 
117
                        delta_path = robust.check_common_error(delta_iter_error_handler,
 
118
                                                                                                   get_delta_path,
 
119
                                                                                                   (new_path, sig_path))
 
120
                        if delta_path: # if not, an error must have occurred
 
121
                                log_delta_path(delta_path)
 
122
                                yield delta_path
 
123
 
 
124
def sigtar2path_iter(sigtarobj):
 
125
        """Convert signature tar file object open for reading into path iter"""
 
126
        tf = tarfile.TarFile("Arbitrary Name", "r", sigtarobj)
 
127
        tf.debug = 2
 
128
        for tarinfo in tf:
 
129
                for prefix in ["signature/", "snapshot/", "deleted/"]:
 
130
                        if tarinfo.name.startswith(prefix):
 
131
                                # strip prefix and from name and set it to difftype
 
132
                                name, difftype = tarinfo.name[len(prefix):], prefix[:-1]
 
133
                                break
 
134
                else: raise DiffDirException("Bad tarinfo name %s" % (tarinfo.name,))
 
135
                        
 
136
                index = tuple(name.split("/"))
 
137
                if not index[-1]: index = index[:-1] # deal with trailing /, ""
 
138
                        
 
139
                ropath = ROPath(index)
 
140
                ropath.difftype = difftype
 
141
                if difftype == "signature" or difftype == "snapshot":
 
142
                        ropath.init_from_tarinfo(tarinfo)
 
143
                        if ropath.isreg(): ropath.setfileobj(tf.extractfile(tarinfo))
 
144
                yield ropath
 
145
        sigtarobj.close()
 
146
 
 
147
def collate_iters(riter1, riter2):
 
148
        """Collate two iterators.
 
149
 
 
150
        The elements yielded by each iterator must be have an index
 
151
        variable, and this function returns pairs (elem1, elem2), (elem1,
 
152
        None), or (None, elem2) two elements in a pair will have the same
 
153
        index, and earlier indicies are yielded later than later indicies.
 
154
 
 
155
        """
 
156
        relem1, relem2 = None, None
 
157
        while 1:
 
158
                if not relem1:
 
159
                        try: relem1 = riter1.next()
 
160
                        except StopIteration:
 
161
                                if relem2: yield (None, relem2)
 
162
                                for relem2 in riter2: yield (None, relem2)
 
163
                                break
 
164
                        index1 = relem1.index
 
165
                if not relem2:
 
166
                        try: relem2 = riter2.next()
 
167
                        except StopIteration:
 
168
                                if relem1: yield (relem1, None)
 
169
                                for relem1 in riter1: yield (relem1, None)
 
170
                                break
 
171
                        index2 = relem2.index
 
172
 
 
173
                if index1 < index2:
 
174
                        yield (relem1, None)
 
175
                        relem1 = None
 
176
                elif index1 == index2:
 
177
                        yield (relem1, relem2)
 
178
                        relem1, relem2 = None, None
 
179
                else: # index2 is less
 
180
                        yield (None, relem2)
 
181
                        relem2 = None
 
182
 
 
183
def combine_path_iters(path_iter_list):
 
184
        """Produce new iterator by combining the iterators in path_iter_list
 
185
 
 
186
        This new iter will iterate every path that is in path_iter_list in
 
187
        order of increasing index.  If multiple iterators in
 
188
        path_iter_list yield paths with the same index, combine_path_iters
 
189
        will discard all paths but the one yielded by the last path_iter.
 
190
 
 
191
        This is used to combine signature iters, as the output will be a
 
192
        full up-to-date signature iter.
 
193
 
 
194
        """
 
195
        path_iter_list = path_iter_list[:] # copy before destructive reverse
 
196
        path_iter_list.reverse()
 
197
 
 
198
        def get_triple(iter_index):
 
199
                """Represent the next element as a triple, to help sorting"""
 
200
                try: path = path_iter_list[iter_index].next()
 
201
                except StopIteration: return None
 
202
                return (path.index, iter_index, path)
 
203
 
 
204
        def refresh_triple_list(triple_list):
 
205
                """Update all elements with path_index same as first element"""
 
206
                path_index = triple_list[0][0]
 
207
                iter_index = 0
 
208
                while iter_index < len(triple_list):
 
209
                        old_triple = triple_list[iter_index]
 
210
                        if old_triple[0] == path_index:
 
211
                                new_triple = get_triple(old_triple[1])
 
212
                                if new_triple:
 
213
                                        triple_list[iter_index] = new_triple
 
214
                                        iter_index += 1
 
215
                                else: del triple_list[iter_index]
 
216
                        else: break # assumed triple_list sorted, so can exit now
 
217
                        
 
218
        triple_list = filter(lambda x: x, map(get_triple,
 
219
                                                                                  range(len(path_iter_list))))
 
220
        while triple_list:
 
221
                triple_list.sort()
 
222
                yield triple_list[0][2]
 
223
                refresh_triple_list(triple_list)
 
224
 
 
225
 
 
226
def DirDelta_WriteSig(path_iter, sig_infp_list, newsig_outfp):
 
227
        """Like DirDelta but also write signature into sig_fileobj
 
228
 
 
229
        Like DirDelta, sig_infp_list can be a tar fileobj or a sorted list
 
230
        of those.  A signature will only be written to newsig_outfp if it
 
231
        is different from (the combined) sig_infp_list.
 
232
 
 
233
        """
 
234
        if type(sig_infp_list) is types.ListType:
 
235
                sig_path_iter = combine_path_iters(map(sigtar2path_iter,
 
236
                                                                                           sig_infp_list))
 
237
        else: sig_path_iter = sigtar2path_iter(sig_infp_list)
 
238
        delta_iter = get_delta_iter_w_sig(path_iter, sig_path_iter, newsig_outfp)
 
239
        return DeltaTarBlockIter(delta_iter)
 
240
 
 
241
def get_delta_iter_w_sig(path_iter, sig_path_iter, sig_fileobj):
 
242
        """Like get_delta_iter but also write signatures to sig_fileobj"""
 
243
        collated = collate_iters(path_iter, sig_path_iter)
 
244
        sigTarFile = tarfile.TarFile("arbitrary", "w", sig_fileobj)
 
245
        for new_path, sig_path in collated:
 
246
                log.Log("Comparing %s and %s" % (new_path and new_path.index,
 
247
                                                                                 sig_path and sig_path.index), 6)
 
248
                if not new_path or not new_path.type: # file doesn't exist
 
249
                        if sig_path and sig_path.exists(): # but signature says it did
 
250
                                log.Log("Generating delta - deleted file: %s" %
 
251
                                                (sig_path.get_relative_path(),), 5)
 
252
                                ti = ROPath(sig_path.index).get_tarinfo()
 
253
                                ti.name = "deleted/" + "/".join(sig_path.index)
 
254
                                sigTarFile.addfile(ti)
 
255
                                yield ROPath(sig_path.index)
 
256
                elif not sig_path or new_path != sig_path:
 
257
                        # Must calculate new signature and create delta
 
258
                        delta_path = robust.check_common_error(
 
259
                                delta_iter_error_handler, get_delta_path_w_sig,
 
260
                                (new_path, sig_path, sigTarFile))
 
261
                        if delta_path:
 
262
                                log_delta_path(delta_path)
 
263
                                yield delta_path
 
264
        sigTarFile.close()
 
265
 
 
266
def get_delta_path_w_sig(new_path, sig_path, sigTarFile):
 
267
        """Return new delta_path which, when read, writes sig to sig_fileobj"""
 
268
        assert new_path
 
269
        ti = new_path.get_tarinfo()
 
270
        index = new_path.index
 
271
        delta_path = new_path.get_rorpath()
 
272
        log.Log("Getting delta of %s and %s" % (new_path, sig_path), 7)
 
273
 
 
274
        def callback(sig_string):
 
275
                """Callback activated when FileWithSignature read to end"""
 
276
                ti.size = len(sig_string)
 
277
                ti.name = "signature/" + "/".join(index)
 
278
                sigTarFile.addfile(ti, cStringIO.StringIO(sig_string))
 
279
 
 
280
        if new_path.isreg() and sig_path and sig_path.difftype == "signature":
 
281
                delta_path.difftype = "diff"
 
282
                old_sigfp = sig_path.open("rb")
 
283
                newfp = FileWithSignature(new_path.open("rb"), callback)
 
284
                delta_path.setfileobj(librsync.DeltaFile(old_sigfp, newfp))
 
285
        else:
 
286
                delta_path.difftype = "snapshot"
 
287
                ti.name = "snapshot/" + "/".join(index) 
 
288
                if not new_path.isreg(): sigTarFile.addfile(ti)
 
289
                else: delta_path.setfileobj(FileWithSignature(new_path.open("rb"),
 
290
                                                                                                          callback))
 
291
        new_path.copy_attribs(delta_path)
 
292
        delta_path.stat.st_size = new_path.stat.st_size
 
293
        return delta_path
 
294
 
 
295
 
 
296
class FileWithSignature:
 
297
        """File-like object which also computes signature as it is read"""
 
298
        blocksize = 32 * 1024
 
299
        def __init__(self, infile, callback, *extra_args):
 
300
                """FileTee initializer
 
301
 
 
302
                The object will act like infile, but whenever it is read it
 
303
                add infile's data to a SigGenerator object.  When the file has
 
304
                been read to the end the callback will be called with the
 
305
                calculated signature, and any extra_args if given.
 
306
 
 
307
                """
 
308
                self.infile, self.callback = infile, callback
 
309
                self.sig_gen = librsync.SigGenerator()
 
310
                self.activated_callback = None
 
311
                self.extra_args = extra_args
 
312
 
 
313
        def read(self, length = -1):
 
314
                buf = self.infile.read(length)
 
315
                self.sig_gen.update(buf)
 
316
                return buf
 
317
 
 
318
        def close(self):
 
319
                # Make sure all of infile read
 
320
                if not self.activated_callback:
 
321
                        while self.read(self.blocksize): pass
 
322
                        self.activated_callback = 1
 
323
                        self.callback(self.sig_gen.getsig(), *self.extra_args)
 
324
                return self.infile.close()
 
325
 
 
326
 
 
327
class TarBlock:
 
328
        """Contain information to add next file to tar"""
 
329
        def __init__(self, index, data):
 
330
                """TarBlock initializer - just store data"""
 
331
                self.index = index
 
332
                self.data = data
 
333
 
 
334
class TarBlockIter:
 
335
        """A bit like an iterator, yield tar blocks
 
336
 
 
337
        We use this class because the process that writes the tar file in
 
338
        blocks may need to know how big the next block is, and it is also
 
339
        useful to know the index of the current file.
 
340
 
 
341
        """
 
342
        def __init__(self, input_iter):
 
343
                """TarBlockIter initializer"""
 
344
                self.input_iter = input_iter
 
345
                self.next_value = None # holds next value if available
 
346
                self.next_set = None # True iff next value is loaded
 
347
                self.at_end = None
 
348
                self.offset = 0l # total length of data read
 
349
                self.process_waiting = None # process_continued has more blocks
 
350
 
 
351
                # We need to instantiate a dummy TarFile just to get access to
 
352
                # some of the functions like _get_full_headers.
 
353
                self.tf = tarfile.TarFromIterator(None)
 
354
 
 
355
        def tarinfo2tarblock(self, index, tarinfo, file_data = ""):
 
356
                """Make tarblock out of tarinfo and file data"""
 
357
                tarinfo.size = len(file_data)
 
358
                headers = self.tf._get_full_headers(tarinfo)
 
359
                blocks, remainder = divmod(tarinfo.size, tarfile.BLOCKSIZE)
 
360
                if remainder > 0: filler_data = "\0" * (tarfile.BLOCKSIZE - remainder)
 
361
                else: filler_data = ""
 
362
                return TarBlock(index, "%s%s%s" % (headers, file_data, filler_data))
 
363
 
 
364
        def process(self, val):
 
365
                """Turn next value of input_iter into a TarBlock"""
 
366
                XXX # this should be subclassed
 
367
 
 
368
        def process_continued(self):
 
369
                """Get more tarblocks
 
370
 
 
371
                If processing val above would produce more than one TarBlock,
 
372
                get the rest of them by calling process_continue.
 
373
 
 
374
                """
 
375
                assert self.process_waiting
 
376
                XXX # subclass this also
 
377
 
 
378
        def set_next(self):
 
379
                """Get value of next element"""
 
380
                if self.process_waiting: self.next_value = self.process_continued()
 
381
                else:
 
382
                        try: input = self.input_iter.next()
 
383
                        except StopIteration: self.at_end = 1
 
384
                        else: self.next_value = self.process(input)
 
385
                self.next_set = 1
 
386
 
 
387
        def next(self):
 
388
                """Return next value, update offset, and clear stored info"""
 
389
                if not self.next_set: self.set_next()
 
390
                if self.at_end: raise StopIteration
 
391
                result = self.next_value
 
392
                self.next_value, self.next_set = None, None
 
393
                self.offset += len(result.data)
 
394
                self.previous_index = result.index
 
395
                return result
 
396
                
 
397
        def peek(self):
 
398
                """Return next element without discarding, or None if at end"""
 
399
                if not self.next_set: self.set_next()
 
400
                if self.at_end: return None
 
401
                else: return self.next_value
 
402
                
 
403
        def get_previous_index(self):
 
404
                """Return index of last TarBlock given by self.next()"""
 
405
                return self.previous_index
 
406
 
 
407
        def get_footer(self):
 
408
                """Return closing string for tarfile, reset offset"""
 
409
                blocks, remainder = divmod(self.offset, tarfile.RECORDSIZE)
 
410
                self.offset = 0l
 
411
                return '\0' * (tarfile.RECORDSIZE - remainder) # remainder can be 0
 
412
 
 
413
        def __iter__(self): return self
 
414
 
 
415
class SigTarBlockIter(TarBlockIter):
 
416
        """TarBlockIter that yields blocks of a signature tar from path_iter"""
 
417
        def process(self, path):
 
418
                """Return associated signature TarBlock from path"""
 
419
                ti = path.get_tarinfo()
 
420
                if path.isreg():
 
421
                        sfp = librsync.SigFile(path.open("rb"))
 
422
                        sigbuf = sfp.read()
 
423
                        sfp.close()
 
424
                        ti.name = "signature/" + "/".join(path.index)
 
425
                        return self.tarinfo2tarblock(path.index, ti, sigbuf)
 
426
                else:
 
427
                        ti.name = "snapshot/" + "/".join(path.index)
 
428
                        return self.tarinfo2tarblock(path.index, ti)
 
429
 
 
430
class DeltaTarBlockIter(TarBlockIter):
 
431
        """TarBlockIter that yields parts of a deltatar file
 
432
 
 
433
        Unlike SigTarBlockIter, the argument to __init__ is a
 
434
        delta_path_iter, so the delta information has already been
 
435
        calculated.  The blocks produced by this class should not have
 
436
        data bigger than self.len_limit.
 
437
 
 
438
        """
 
439
        len_limit = 1024 * 1024
 
440
        def process(self, delta_ropath):
 
441
                """Get a tarblock from delta_ropath"""
 
442
                def add_prefix(tarinfo, prefix):
 
443
                        """Add prefix to the name of a tarinfo file"""
 
444
                        if tarinfo.name == ".": tarinfo.name = prefix + "/"
 
445
                        else: tarinfo.name = "%s/%s" % (prefix, tarinfo.name)
 
446
 
 
447
                ti = delta_ropath.get_tarinfo()
 
448
                index = delta_ropath.index
 
449
 
 
450
                # Return blocks of deleted files or fileless snapshots
 
451
                if not delta_ropath.type or not delta_ropath.fileobj:
 
452
                        if not delta_ropath.type: add_prefix(ti, "deleted")
 
453
                        else:
 
454
                                assert delta_ropath.difftype == "snapshot"
 
455
                                add_prefix(ti, "snapshot")
 
456
                        return self.tarinfo2tarblock(index, ti)
 
457
                        
 
458
                # Now handle single volume block case
 
459
                fp = delta_ropath.open("rb")
 
460
                data, last_block = self.get_data_block(fp)
 
461
                if last_block:
 
462
                        if delta_ropath.difftype == "snapshot": add_prefix(ti, "snapshot")
 
463
                        elif delta_ropath.difftype == "diff": add_prefix(ti, "diff")
 
464
                        else: assert 0, "Unknown difftype"
 
465
                        return self.tarinfo2tarblock(index, ti, data)
 
466
 
 
467
                # Finally, do multivol snapshot or diff case
 
468
                full_name = "multivol_%s/%s" % (delta_ropath.difftype, ti.name)
 
469
                ti.name = full_name + "/1"
 
470
                self.process_prefix = full_name
 
471
                self.process_fp = fp
 
472
                self.process_ropath = delta_ropath
 
473
                self.process_waiting = 1
 
474
                self.process_next_vol_number = 2
 
475
                return self.tarinfo2tarblock(index, ti, data)
 
476
 
 
477
        def get_data_block(self, fp):
 
478
                """Return pair (next data block, boolean last data block)"""
 
479
                buf = fp.read(self.len_limit)
 
480
                if len(buf) < self.len_limit:
 
481
                        if fp.close(): raise DiffDirException("Error closing file")
 
482
                        return (buf, 1)
 
483
                else: return (buf, None)
 
484
 
 
485
        def process_continued(self):
 
486
                """Return next volume in multivol diff or snapshot"""
 
487
                assert self.process_waiting
 
488
                ropath = self.process_ropath
 
489
                ti, index = ropath.get_tarinfo(), ropath.index
 
490
                ti.name = "%s/%d" % (self.process_prefix, self.process_next_vol_number)
 
491
                data, last_block = self.get_data_block(self.process_fp)
 
492
                if last_block:
 
493
                        self.process_prefix = None
 
494
                        self.process_fp = None
 
495
                        self.process_ropath = None
 
496
                        self.process_waiting = None
 
497
                        self.process_next_vol_number = None
 
498
                else: self.process_next_vol_number += 1
 
499
                return self.tarinfo2tarblock(index, ti, data)
 
500
 
 
501
 
 
502
def write_block_iter(block_iter, out_obj):
 
503
        """Write block_iter to filename, path, or file object"""
 
504
        if isinstance(out_obj, Path): fp = open(out_obj.name, "wb")
 
505
        elif type(out_obj) is types.StringType: fp = open(out_obj, "wb")
 
506
        else: fp = out_obj
 
507
        for block in block_iter: fp.write(block.data)
 
508
        fp.write(block_iter.get_footer())
 
509
        assert not fp.close()
 
510
        if isinstance(out_obj, Path): out_obj.setdata()
 
511