26
class RetrySleeper(object):
27
"""A retry sleeper that will track its jitter, backoff and
28
sleep appropriately when asked."""
29
def __init__(self, max_tries=1, delay=0.1, backoff=2, max_jitter=0.8,
30
max_delay=3600, sleep_func=time.sleep):
31
"""Create a :class:`KazooRetry` instance
33
:param max_tries: How many times to retry the command.
34
:param delay: Initial delay between retry attempts.
35
:param backoff: Backoff multiplier between retry attempts.
36
Defaults to 2 for exponential backoff.
37
:param max_jitter: Additional max jitter period to wait between
38
retry attempts to avoid slamming the server.
39
:param max_delay: Maximum delay in seconds, regardless of other
40
backoff settings. Defaults to one hour.
43
self.sleep_func = sleep_func
44
self.max_tries = max_tries
46
self.backoff = backoff
47
self.max_jitter = int(max_jitter * 100)
48
self.max_delay = float(max_delay)
50
self._cur_delay = delay
53
"""Reset the attempt counter"""
55
self._cur_delay = self.delay
58
"""Increment the failed count, and sleep appropriately before
60
if self._attempts == self.max_tries:
61
raise RetryFailedError("Too many retry attempts")
63
jitter = random.randint(0, self.max_jitter) / 100.0
64
self.sleep_func(self._cur_delay + jitter)
65
self._cur_delay = min(self._cur_delay * self.backoff, self.max_delay)
68
"""Return a clone of this retry sleeper"""
69
return RetrySleeper(self.max_tries, self.delay, self.backoff,
70
self.max_jitter / 100.0, self.max_delay,
26
class InterruptedError(RetryFailedError):
27
"""Raised when the retry is forcibly interrupted by the interrupt
74
31
class KazooRetry(object):
87
44
def __init__(self, max_tries=1, delay=0.1, backoff=2, max_jitter=0.8,
88
max_delay=3600, ignore_expire=True, sleep_func=time.sleep):
89
"""Create a :class:`KazooRetry` instance
45
max_delay=3600, ignore_expire=True, sleep_func=time.sleep,
46
deadline=None, interrupt=None):
47
"""Create a :class:`KazooRetry` instance for retrying function
91
50
:param max_tries: How many times to retry the command.
92
51
:param delay: Initial delay between retry attempts.
99
58
:param ignore_expire:
100
59
Whether a session expiration should be ignored and treated
101
60
as a retry-able command.
62
Function that will be called with no args that may return
63
True if the retry should be ceased immediately. This will
64
be called no more than every 0.1 seconds during a wait
104
self.retry_sleeper = RetrySleeper(max_tries, delay, backoff, max_jitter,
105
max_delay, sleep_func)
68
self.max_tries = max_tries
70
self.backoff = backoff
71
self.max_jitter = int(max_jitter * 100)
72
self.max_delay = float(max_delay)
74
self._cur_delay = delay
75
self.deadline = deadline
76
self._cur_stoptime = None
106
77
self.sleep_func = sleep_func
107
78
self.retry_exceptions = self.RETRY_EXCEPTIONS
79
self.interrupt = interrupt
109
81
self.retry_exceptions += self.EXPIRED_EXCEPTIONS
84
"""Reset the attempt counter"""
86
self._cur_delay = self.delay
87
self._cur_stoptime = None
90
"""Return a clone of this retry manager"""
91
obj = KazooRetry(self.max_tries, self.delay, self.backoff,
92
self.max_jitter / 100.0, self.max_delay,
93
self.sleep_func, deadline=self.deadline,
94
interrupt=self.interrupt)
95
obj.retry_exceptions = self.retry_exceptions
111
98
def __call__(self, func, *args, **kwargs):
112
self.retry_sleeper.reset()
99
"""Call a function with arguments until it completes without
100
throwing a Kazoo exception
102
:param func: Function to call
103
:param args: Positional arguments to call the function with
104
:params kwargs: Keyword arguments to call the function with
106
The function will be called until it doesn't throw one of the
107
retryable exceptions (ConnectionLoss, OperationTimeout, or
108
ForceRetryError), and optionally retrying on session
116
if self.deadline is not None and self._cur_stoptime is None:
117
self._cur_stoptime = time.time() + self.deadline
116
118
return func(*args, **kwargs)
117
119
except ConnectionClosedError:
119
121
except self.retry_exceptions:
120
self.retry_sleeper.increment()
122
if self._attempts == self.max_tries:
123
raise RetryFailedError("Too many retry attempts")
125
sleeptime = self._cur_delay + (random.randint(0, self.max_jitter) / 100.0)
127
if self._cur_stoptime is not None and time.time() + sleeptime >= self._cur_stoptime:
128
raise RetryFailedError("Exceeded retry deadline")
132
# Break the time period down and sleep for no longer than
133
# 0.1 before calling the interrupt
135
self.sleep_func(sleeptime)
136
sleeptime -= sleeptime
141
raise InterruptedError()
143
self.sleep_func(sleeptime)
144
self._cur_delay = min(self._cur_delay * self.backoff, self.max_delay)