~yajo/duplicity/duplicity

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4 -*-
#
# Copyright 2002 Ben Escoto <ben@emerose.org>
# Copyright 2007 Kenneth Loafman <kenneth@loafman.com>
#
# This file is part of duplicity.
#
# Duplicity is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# Duplicity is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with duplicity; if not, write to the Free Software Foundation,
# Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA

"""
Duplicity specific but otherwise generic threading interfaces and
utilities.

(Not called "threading" because we do not want to conflict with
the standard threading module, and absolute imports require
at least python 2.5.)
"""

_threading_supported = True

try:
    import thread
except ImportError:
    import dummy_thread as thread
    _threading_supported = False

try:
    import threading
except ImportError:
    import dummy_threading as threading
    _threading_supported = False

import sys

from duplicity import errors


def threading_supported():
    """
    Returns whether threading is supported on the system we are
    running on.
    """
    return _threading_supported


def require_threading(reason=None):
    """
    Assert that threading is required for operation to continue. Raise
    an appropriate exception if this is not the case.

    Reason specifies an optional reason why threading is required,
    which will be used for error reporting in case threading is not
    supported.
    """
    if not threading_supported():
        if reason is None:
            reason = "(no reason given)"
        raise errors.NotSupported("threading was needed because [%s], but "
                                  "is not supported by the python "
                                  "interpreter" % (reason,))


def thread_module():
    """
    Returns the thread module, or dummy_thread if threading is not
    supported.
    """
    return thread


def threading_module():
    """
    Returns the threading module, or dummy_thread if threading is not
    supported.
    """
    return threading


def with_lock(lock, fn):
    """
    Call fn with lock acquired. Guarantee that lock is released upon
    the return of fn.

    Returns the value returned by fn, or raises the exception raised
    by fn.

    (Lock can actually be anything responding to acquire() and
    release().)
    """
    lock.acquire()

    try:
        return fn()
    finally:
        lock.release()


def interruptably_wait(cv, waitFor):
    """
    cv   - The threading.Condition instance to wait on
    test - Callable returning a boolean to indicate whether
           the criteria being waited on has been satisfied.

    Perform a wait on a condition such that it is keyboard
    interruptable when done in the main thread. Due to Python
    limitations as of <= 2.5, lock acquisition and conditions waits
    are not interruptable when performed in the main thread.

    Currently, this comes at a cost additional CPU use, compared to a
    normal wait. Future implementations may be more efficient if the
    underlying python supports it.

    The condition must be acquired.

    This function should only be used on conditions that are never
    expected to be acquired for extended periods of time, or the
    lock-acquire of the underlying condition could cause an
    uninterruptable state despite the efforts of this function.

    There is no equivalent for acquireing a lock, as that cannot be
    done efficiently.

    Example:

    Instead of:

      cv.acquire()
      while not thing_done:
        cv.wait(someTimeout)
      cv.release()

    do:

      cv.acquire()
      interruptable_condwait(cv, lambda: thing_done)
      cv.release()

    """
    # We can either poll at some interval, or wait with a short enough
    # timeout to be practical (i.e., such that it interactively seems
    # to response semi-immediately to an interrupt).
    #
    # Both approaches waste CPU, but the latter approach does not
    # imply a latency penalty in the common case of a
    # notify.
    while not waitFor():
        cv.wait(0.1)


def async_split(fn):
    """
    Splits the act of calling the given function into one front-end
    part for waiting on the result, and a back-end part for performing
    the work in another thread.

    Returns (waiter, caller) where waiter is a function to be called
    in order to wait for the results of an asynchronous invokation of
    fn to complete, returning fn's result or propagating it's
    exception.

    Caller is the function to call in a background thread in order to
    execute fn asynchronously. Caller will return (success, waiter)
    where success is a boolean indicating whether the function
    suceeded (did NOT raise an exception), and waiter is the waiter
    that was originally returned by the call to async_split().
    """
    # Implementation notes:
    #
    # We use a dictionary to track the state of the asynchronous call,
    # rather than local variables. This is to get around the way
    # closures work with respect to local variables in Python. We do
    # not care about hash lookup overhead since this is intended to be
    # used for significant amounts of work.

    cv = threading.Condition()  # @UndefinedVariable
    state = {'done': False,
             'error': None,
             'trace': None,
             'value': None}

    def waiter():
        cv.acquire()
        try:
            interruptably_wait(cv, lambda: state['done'])

            if state['error'] is None:
                return state['value']
            else:
                raise state['error'].with_traceback(state['trace'])
        finally:
            cv.release()

    def caller():
        try:
            value = fn()

            cv.acquire()
            state['done'] = True
            state['value'] = value
            cv.notify()
            cv.release()

            return (True, waiter)
        except Exception as e:
            cv.acquire()
            state['done'] = True
            state['error'] = e
            state['trace'] = sys.exc_info()[2]
            cv.notify()
            cv.release()

            return (False, waiter)

    return (waiter, caller)


class Value:
    """
    A thread-safe container of a reference to an object (but not the
    object itself).

    In particular this means it is safe to:

      value.set(1)

    But unsafe to:

      value.get()['key'] = value

    Where the latter must be done using something like:

      def _setprop():
        value.get()['key'] = value

      with_lock(value, _setprop)

    Operations such as increments are best done as:

      value.transform(lambda val: val + 1)
    """

    def __init__(self, value=None):
        """
        Initialuze with the given value.
        """
        self.__value = value

        self.__cv = threading.Condition()  # @UndefinedVariable

    def get(self):
        """
        Returns the value protected by this Value.
        """
        return with_lock(self.__cv, lambda: self.__value)

    def set(self, value):
        """
        Resets the value protected by this Value.
        """
        def _set():
            self.__value = value

        with_lock(self.__cv, _set)

    def transform(self, fn):
        """
        Call fn with the current value as the parameter, and reset the
        value to the return value of fn.

        During the execution of fn, all other access to this Value is
        prevented.

        If fn raised an exception, the value is not reset.

        Returns the value returned by fn, or raises the exception
        raised by fn.
        """
        def _transform():
            self.__value = fn(self.__value)
            return self.__value

        return with_lock(self.cv, _transform)

    def acquire(self):
        """
        Acquire this Value for mutually exclusive access. Only ever
        needed when calling code must perform operations that cannot
        be done with get(), set() or transform().
        """
        self.__cv.acquire()

    def release(self):
        """
        Release this Value for mutually exclusive access.
        """
        self.__cv.release()