1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
|
# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4 -*-
#
# Copyright 2002 Ben Escoto <ben@emerose.org>
# Copyright 2007 Kenneth Loafman <kenneth@loafman.com>
# Copyright 2008 Peter Schuller <peter.schuller@infidyne.com>
#
# This file is part of duplicity.
#
# Duplicity is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# Duplicity is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with duplicity; if not, write to the Free Software Foundation,
# Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""
Asynchronous job scheduler, for concurrent execution with minimalistic
dependency guarantees.
"""
import duplicity
from duplicity import log
from duplicity.dup_threading import require_threading
from duplicity.dup_threading import interruptably_wait
from duplicity.dup_threading import async_split
from duplicity.dup_threading import with_lock
thread = duplicity.dup_threading.thread_module()
threading = duplicity.dup_threading.threading_module()
class AsyncScheduler:
"""
Easy-to-use scheduler of function calls to be executed
concurrently. A very simple dependency mechanism exists in the
form of barriers (see insert_barrier()).
Each instance has a concurrency level associated with it. A
concurrency of 0 implies that all tasks will be executed
synchronously when scheduled. A concurrency of 1 indicates that a
task will be executed asynchronously, but never concurrently with
other tasks. Both 0 and 1 guarantee strict ordering among all
tasks (i.e., they will be executed in the order scheduled).
At concurrency levels above 1, the tasks will end up being
executed in an order undetermined except insofar as is enforced by
calls to insert_barrier().
An AsynchScheduler should be created for any independent process;
the scheduler will assume that if any background job fails (raises
an exception), it makes further work moot.
"""
def __init__(self, concurrency):
"""
Create an asynchronous scheduler that executes jobs with the
given level of concurrency.
"""
log.Info("%s: %s" % (self.__class__.__name__,
_("instantiating at concurrency %d") %
(concurrency)))
assert concurrency >= 0, "%s concurrency level must be >= 0" % (self.__class__.__name__,)
self.__failed = False # has at least one task failed so far?
self.__failed_waiter = None # when __failed, the waiter of the first task that failed
self.__concurrency = concurrency
self.__worker_count = 0 # number of active workers
self.__waiter_count = 0 # number of threads waiting to submit work
self.__barrier = False # barrier currently in effect?
self.__cv = threading.Condition() # for simplicity, we use a single cv with its lock
# # for everything, even if the resulting notifyAll():s
# # are not technically efficient.
if concurrency > 0:
require_threading("concurrency > 0 (%d)" % (concurrency,))
def insert_barrier(self):
"""
Proclaim that any tasks scheduled prior to the call to this
method MUST be executed prior to any tasks scheduled after the
call to this method.
The intended use case is that if task B depends on A, a
barrier must be inserted in between to guarantee that A
happens before B.
"""
log.Debug("%s: %s" % (self.__class__.__name__, _("inserting barrier")))
# With concurrency 0 it's a NOOP, and due to the special case in
# task scheduling we do not want to append to the queue (will never
# be popped).
if self.__concurrency > 0:
def _insert_barrier():
self.__barrier = True
with_lock(self.__cv, _insert_barrier)
def schedule_task(self, fn, params):
"""
Schedule the given task (callable, typically function) for
execution. Pass the given parameters to the function when
calling it. Returns a callable which can optionally be used
to wait for the task to complete, either by returning its
return value or by propagating any exception raised by said
task.
This method may block or return immediately, depending on the
configuration and state of the scheduler.
This method may also raise an exception in order to trigger
failures early, if the task (if run synchronously) or a previous
task has already failed.
NOTE: Pay particular attention to the scope in which this is
called. In particular, since it will execute concurrently in
the background, assuming fn is a closure, any variables used
most be properly bound in the closure. This is the reason for
the convenience feature of being able to give parameters to
the call, to avoid having to wrap the call itself in a
function in order to "fixate" variables in, for example, an
enclosing loop.
"""
assert fn is not None
# Note: It is on purpose that we keep track of concurrency in
# the front end and launch threads for each task, rather than
# keep a pool of workers. The overhead is not relevant in the
# situation this will be used, and it removes complexity in
# terms of ensuring the scheduler is garbage collected/shut
# down properly when no longer referenced/needed by calling
# code.
if self.__concurrency == 0:
# special case this to not require any platform support for
# threading at all
log.Info("%s: %s" % (self.__class__.__name__,
_("running task synchronously (asynchronicity disabled)")),
log.InfoCode.synchronous_upload_begin)
return self.__run_synchronously(fn, params)
else:
log.Info("%s: %s" % (self.__class__.__name__,
_("scheduling task for asynchronous execution")),
log.InfoCode.asynchronous_upload_begin)
return self.__run_asynchronously(fn, params)
def wait(self):
"""
Wait for the scheduler to become entirely empty (i.e., all
tasks having run to completion).
IMPORTANT: This is only useful with a single caller scheduling
tasks, such that no call to schedule_task() is currently in
progress or may happen subsequently to the call to wait().
"""
def _wait():
interruptably_wait(self.__cv, lambda: self.__worker_count == 0 and self.__waiter_count == 0)
with_lock(self.__cv, _wait)
def __run_synchronously(self, fn, params):
# When running synchronously, we immediately leak any exception raised
# for immediate failure reporting to calling code.
ret = fn(*params)
def _waiter():
return ret
log.Info("%s: %s" % (self.__class__.__name__,
_("task completed successfully")),
log.InfoCode.synchronous_upload_done)
return _waiter
def __run_asynchronously(self, fn, params):
(waiter, caller) = async_split(lambda: fn(*params))
def check_pending_failure():
if self.__failed:
log.Info("%s: %s" % (self.__class__.__name__,
_("a previously scheduled task has failed; "
"propagating the result immediately")),
log.InfoCode.asynchronous_upload_done)
self.__failed_waiter()
raise AssertionError("%s: waiter should have raised an exception; "
"this is a bug" % (self.__class__.__name__,))
def wait_for_and_register_launch():
check_pending_failure() # raise on fail
while self.__worker_count >= self.__concurrency or self.__barrier:
if self.__worker_count == 0:
assert self.__barrier, "barrier should be in effect"
self.__barrier = False
self.__cv.notifyAll()
else:
self.__waiter_count += 1
self.__cv.wait()
self.__waiter_count -= 1
check_pending_failure() # raise on fail
self.__worker_count += 1
log.Debug("%s: %s" % (self.__class__.__name__,
_("active workers = %d") % (self.__worker_count,)))
# simply wait for an OK condition to start, then launch our worker. the worker
# never waits on us, we just wait for them.
with_lock(self.__cv, wait_for_and_register_launch)
self.__start_worker(caller)
return waiter
def __start_worker(self, caller):
"""
Start a new worker.
"""
def trampoline():
try:
self.__execute_caller(caller)
finally:
def complete_worker():
self.__worker_count -= 1
log.Debug("%s: %s" % (self.__class__.__name__,
_("active workers = %d") % (self.__worker_count,)))
self.__cv.notifyAll()
with_lock(self.__cv, complete_worker)
thread.start_new_thread(trampoline, ())
def __execute_caller(self, caller):
# The caller half that we get here will not propagate
# errors back to us, but rather propagate it back to the
# "other half" of the async split.
succeeded, waiter = caller()
if not succeeded:
def _signal_failed():
if not self.__failed:
self.__failed = True
self.__failed_waiter = waiter
self.__cv.notifyAll()
with_lock(self.__cv, _signal_failed)
log.Info("%s: %s" % (self.__class__.__name__,
_("task execution done (success: %s)") % succeeded),
log.InfoCode.asynchronous_upload_done)
|