1
# Copyright 2002 Ben Escoto
3
# This file is part of duplicity.
5
# duplicity is free software; you can redistribute it and/or modify it
6
# under the terms of the GNU General Public License as published by
7
# the Free Software Foundation, Inc., 675 Mass Ave, Cambridge MA
8
# 02139, USA; either version 2 of the License, or (at your option) any
9
# later version; incorporated herein by reference.
11
"""Define some lazy data structures and functions acting on them"""
13
from __future__ import generators
14
import os, stat, types, sys
19
"""Hold static methods for the manipulation of lazy iterators"""
21
def filter(predicate, iterator):
22
"""Like filter in a lazy functional programming language"""
24
if predicate(i): yield i
26
def map(function, iterator):
27
"""Like map in a lazy functional programming language"""
28
for i in iterator: yield function(i)
30
def foreach(function, iterator):
31
"""Run function on each element in iterator"""
32
for i in iterator: function(i)
35
"""Lazily concatenate iterators"""
37
for i in iter: yield i
39
def cat2(iter_of_iters):
40
"""Lazily concatenate iterators, iterated by big iterator"""
41
for iter in iter_of_iters:
42
for i in iter: yield i
45
"""True if iterator has length 0"""
46
for i in iter: return None
49
def equal(iter1, iter2, verbose = None, operator = lambda x, y: x == y):
50
"""True if iterator 1 has same elements as iterator 2
52
Use equality operator, or == if it is unspecified.
56
try: i2 = iter2.next()
58
if verbose: print "End when i1 = %s" % (i1,)
60
if not operator(i1, i2):
61
if verbose: print "%s not equal to %s" % (i1, i2)
63
try: i2 = iter2.next()
64
except StopIteration: return 1
65
if verbose: print "End when i2 = %s" % (i2,)
69
"""True if any element in iterator is true. Short circuiting"""
76
"""True if all elements in iterator are true. Short circuiting"""
83
"""Return length of iterator"""
87
except StopIteration: return i
90
def foldr(f, default, iter):
91
"""foldr the "fundamental list recursion operator"?"""
92
try: next = iter.next()
93
except StopIteration: return default
94
return f(next, Iter.foldr(f, default, iter))
96
def foldl(f, default, iter):
97
"""the fundamental list iteration operator.."""
99
try: next = iter.next()
100
except StopIteration: return default
101
default = f(default, next)
103
def multiplex(iter, num_of_forks, final_func = None, closing_func = None):
104
"""Split a single iterater into a number of streams
106
The return val will be a list with length num_of_forks, each
107
of which will be an iterator like iter. final_func is the
108
function that will be called on each element in iter just as
109
it is being removed from the buffer. closing_func is called
110
when all the streams are finished.
113
if num_of_forks == 2 and not final_func and not closing_func:
114
im2 = IterMultiplex2(iter)
115
return (im2.yielda(), im2.yieldb())
116
if not final_func: final_func = lambda i: None
117
if not closing_func: closing_func = lambda: None
119
# buffer is a list of elements that some iterators need and others
123
# buffer[forkposition[i]] is the next element yieled by iterator
124
# i. If it is -1, yield from the original iter
125
starting_forkposition = [-1] * num_of_forks
126
forkposition = starting_forkposition[:]
127
called_closing_func = [None]
129
def get_next(fork_num):
130
"""Return the next element requested by fork_num"""
131
if forkposition[fork_num] == -1:
132
try: buffer.insert(0, iter.next())
133
except StopIteration:
134
# call closing_func if necessary
135
if (forkposition == starting_forkposition and
136
not called_closing_func[0]):
138
called_closing_func[0] = None
140
for i in range(num_of_forks): forkposition[i] += 1
142
return_val = buffer[forkposition[fork_num]]
143
forkposition[fork_num] -= 1
146
if not (blen-1) in forkposition:
147
# Last position in buffer no longer needed
148
assert forkposition[fork_num] == blen-2
149
final_func(buffer[blen-1])
153
def make_iterator(fork_num):
154
while(1): yield get_next(fork_num)
156
return tuple(map(make_iterator, range(num_of_forks)))
161
class IterMultiplex2:
162
"""Multiplex an iterator into 2 parts
164
This is a special optimized case of the Iter.multiplex function,
165
used when there is no closing_func or final_func, and we only want
166
to split it into 2. By profiling, this is a time sensitive class.
169
def __init__(self, iter):
170
self.a_leading_by = 0 # How many places a is ahead of b
175
"""Return first iterator"""
176
buf, iter = self.buffer, self.iter
178
if self.a_leading_by >= 0: # a is in front, add new element
179
elem = iter.next() # exception will be passed
181
else: elem = buf.pop(0) # b is in front, subtract an element
182
self.a_leading_by += 1
186
"""Return second iterator"""
187
buf, iter = self.buffer, self.iter
189
if self.a_leading_by <= 0: # b is in front, add new element
190
elem = iter.next() # exception will be passed
192
else: elem = buf.pop(0) # a is in front, subtract an element
193
self.a_leading_by -= 1
197
class IterTreeReducer:
198
"""Tree style reducer object for iterator - stolen from rdiff-backup
200
The indicies of a RORPIter form a tree type structure. This class
201
can be used on each element of an iter in sequence and the result
202
will be as if the corresponding tree was reduced. This tries to
203
bridge the gap between the tree nature of directories, and the
204
iterator nature of the connection between hosts and the temporal
205
order in which the files are processed.
207
This will usually be used by subclassing ITRBranch below and then
208
call the initializer below with the new class.
211
def __init__(self, branch_class, branch_args):
212
"""ITR initializer"""
213
self.branch_class = branch_class
214
self.branch_args = branch_args
216
self.root_branch = branch_class(*branch_args)
217
self.branches = [self.root_branch]
219
def finish_branches(self, index):
220
"""Run Finish() on all branches index has passed
222
When we pass out of a branch, delete it and process it with
223
the parent. The innermost branches will be the last in the
224
list. Return None if we are out of the entire tree, and 1
228
branches = self.branches
230
to_be_finished = branches[-1]
231
base_index = to_be_finished.base_index
232
if base_index != index[:len(base_index)]:
233
# out of the tree, finish with to_be_finished
234
to_be_finished.call_end_proc()
236
if not branches: return None
237
branches[-1].branch_process(to_be_finished)
240
def add_branch(self):
241
"""Return branch of type self.branch_class, add to branch list"""
242
branch = self.branch_class(*self.branch_args)
243
self.branches.append(branch)
246
def process_w_branch(self, index, branch, args):
247
"""Run start_process on latest branch"""
248
robust.check_common_error(branch.on_error,
249
branch.start_process, args)
250
if not branch.caught_exception: branch.start_successful = 1
251
branch.base_index = index
254
"""Call at end of sequence to tie everything up"""
256
to_be_finished = self.branches.pop()
257
to_be_finished.call_end_proc()
258
if not self.branches: break
259
self.branches[-1].branch_process(to_be_finished)
261
def __call__(self, *args):
262
"""Process args, where args[0] is current position in iterator
264
Returns true if args successfully processed, false if index is
265
not in the current tree and thus the final result is
268
Also note below we set self.index after doing the necessary
269
start processing, in case there is a crash in the middle.
273
if self.index is None:
274
self.process_w_branch(index, self.root_branch, args)
278
if index <= self.index:
279
log.Log("Warning: oldindex %s >= newindex %s" %
280
(self.index, index), 2)
283
if self.finish_branches(index) is None:
284
return None # We are no longer in the main tree
285
last_branch = self.branches[-1]
286
if last_branch.start_successful:
287
if last_branch.can_fast_process(*args):
288
robust.check_common_error(last_branch.on_error,
289
last_branch.fast_process, args)
291
branch = self.add_branch()
292
self.process_w_branch(index, branch, args)
293
else: last_branch.log_prev_error(index)
300
"""Helper class for IterTreeReducer above
302
There are five stub functions below: start_process, end_process,
303
branch_process, fast_process, and can_fast_process. A class that
304
subclasses this one will probably fill in these functions to do
308
base_index = index = None
310
caught_exception = start_successful = None
312
def call_end_proc(self):
313
"""Runs the end_process on self, checking for errors"""
314
if self.finished or not self.start_successful:
315
self.caught_exception = 1
317
#if self.caught_exception: self.log_prev_error(self.base_index)
318
#else: robust.check_common_error(self.on_error, self.end_process)
319
# Since all end_process does is copy over attributes, might as
320
# well run it even if we did get errors earlier.
321
robust.check_common_error(self.on_error, self.end_process)
325
def start_process(self, *args):
326
"""Do some initial processing (stub)"""
329
def end_process(self):
330
"""Do any final processing before leaving branch (stub)"""
333
def branch_process(self, branch):
334
"""Process a branch right after it is finished (stub)"""
335
assert branch.finished
338
def can_fast_process(self, *args):
339
"""True if object can be processed without new branch (stub)"""
342
def fast_process(self, *args):
343
"""Process args without new child branch (stub)"""
346
def on_error(self, exc, *args):
347
"""This is run on any exception in start/end-process"""
348
self.caught_exception = 1
349
if args and args[0] and isinstance(args[0], tuple):
350
filename = os.path.join(*args[0])
351
elif self.index: filename = os.path.join(*self.index)
353
log.Log("Error '%s' processing %s" % (exc, filename), 2)
355
def log_prev_error(self, index):
356
"""Call function if no pending exception"""
357
if not index: index_str = "."
358
else: index_str = os.path.join(*index)
359
log.Log("Skipping %s because of previous error" % index_str, 2)