~ubuntu-branches/ubuntu/utopic/kazoo/utopic-proposed

« back to all changes in this revision

Viewing changes to kazoo/retry.py

  • Committer: Package Import Robot
  • Author(s): Neil Williams
  • Date: 2013-08-26 06:26:20 UTC
  • mfrom: (1.1.1)
  • Revision ID: package-import@ubuntu.com-20130826062620-dv79eayvq78028jb
Tags: 1.2.1-1
* New upstream release.
* Fix sphinx documentation build on clean systems (added
  python-gevent to Build-Depends and patched config).
* Bumped standards version to 3.9.4. No changes needed.

Show diffs side-by-side

added added

removed removed

Lines of Context:
6
6
    ConnectionClosedError,
7
7
    ConnectionLoss,
8
8
    KazooException,
 
9
    OperationTimeoutError,
9
10
    SessionExpiredError,
10
 
    OperationTimeoutError
11
11
)
12
12
 
13
13
log = logging.getLogger(__name__)
14
14
 
15
15
 
16
16
class ForceRetryError(Exception):
17
 
    """Raised when some recipe logic wants to force a retry"""
 
17
    """Raised when some recipe logic wants to force a retry."""
18
18
 
19
19
 
20
20
class RetryFailedError(KazooException):
23
23
    """
24
24
 
25
25
 
26
 
class RetrySleeper(object):
27
 
    """A retry sleeper that will track its jitter, backoff and
28
 
    sleep appropriately when asked."""
29
 
    def __init__(self, max_tries=1, delay=0.1, backoff=2, max_jitter=0.8,
30
 
                 max_delay=3600, sleep_func=time.sleep):
31
 
        """Create a :class:`KazooRetry` instance
32
 
 
33
 
        :param max_tries: How many times to retry the command.
34
 
        :param delay: Initial delay between retry attempts.
35
 
        :param backoff: Backoff multiplier between retry attempts.
36
 
                        Defaults to 2 for exponential backoff.
37
 
        :param max_jitter: Additional max jitter period to wait between
38
 
                           retry attempts to avoid slamming the server.
39
 
        :param max_delay: Maximum delay in seconds, regardless of other
40
 
                          backoff settings. Defaults to one hour.
41
 
 
42
 
        """
43
 
        self.sleep_func = sleep_func
44
 
        self.max_tries = max_tries
45
 
        self.delay = delay
46
 
        self.backoff = backoff
47
 
        self.max_jitter = int(max_jitter * 100)
48
 
        self.max_delay = float(max_delay)
49
 
        self._attempts = 0
50
 
        self._cur_delay = delay
51
 
 
52
 
    def reset(self):
53
 
        """Reset the attempt counter"""
54
 
        self._attempts = 0
55
 
        self._cur_delay = self.delay
56
 
 
57
 
    def increment(self):
58
 
        """Increment the failed count, and sleep appropriately before
59
 
        continuing"""
60
 
        if self._attempts == self.max_tries:
61
 
            raise RetryFailedError("Too many retry attempts")
62
 
        self._attempts += 1
63
 
        jitter = random.randint(0, self.max_jitter) / 100.0
64
 
        self.sleep_func(self._cur_delay + jitter)
65
 
        self._cur_delay = min(self._cur_delay * self.backoff, self.max_delay)
66
 
 
67
 
    def copy(self):
68
 
        """Return a clone of this retry sleeper"""
69
 
        return RetrySleeper(self.max_tries, self.delay, self.backoff,
70
 
                           self.max_jitter / 100.0, self.max_delay,
71
 
                           self.sleep_func)
 
26
class InterruptedError(RetryFailedError):
 
27
    """Raised when the retry is forcibly interrupted by the interrupt
 
28
    function"""
72
29
 
73
30
 
74
31
class KazooRetry(object):
85
42
    )
86
43
 
87
44
    def __init__(self, max_tries=1, delay=0.1, backoff=2, max_jitter=0.8,
88
 
                 max_delay=3600, ignore_expire=True, sleep_func=time.sleep):
89
 
        """Create a :class:`KazooRetry` instance
 
45
                 max_delay=3600, ignore_expire=True, sleep_func=time.sleep,
 
46
                 deadline=None, interrupt=None):
 
47
        """Create a :class:`KazooRetry` instance for retrying function
 
48
        calls
90
49
 
91
50
        :param max_tries: How many times to retry the command.
92
51
        :param delay: Initial delay between retry attempts.
99
58
        :param ignore_expire:
100
59
            Whether a session expiration should be ignored and treated
101
60
            as a retry-able command.
 
61
        :param interrupt:
 
62
            Function that will be called with no args that may return
 
63
            True if the retry should be ceased immediately. This will
 
64
            be called no more than every 0.1 seconds during a wait
 
65
            between retries.
102
66
 
103
67
        """
104
 
        self.retry_sleeper = RetrySleeper(max_tries, delay, backoff, max_jitter,
105
 
                                         max_delay, sleep_func)
 
68
        self.max_tries = max_tries
 
69
        self.delay = delay
 
70
        self.backoff = backoff
 
71
        self.max_jitter = int(max_jitter * 100)
 
72
        self.max_delay = float(max_delay)
 
73
        self._attempts = 0
 
74
        self._cur_delay = delay
 
75
        self.deadline = deadline
 
76
        self._cur_stoptime = None
106
77
        self.sleep_func = sleep_func
107
78
        self.retry_exceptions = self.RETRY_EXCEPTIONS
 
79
        self.interrupt = interrupt
108
80
        if ignore_expire:
109
81
            self.retry_exceptions += self.EXPIRED_EXCEPTIONS
110
82
 
 
83
    def reset(self):
 
84
        """Reset the attempt counter"""
 
85
        self._attempts = 0
 
86
        self._cur_delay = self.delay
 
87
        self._cur_stoptime = None
 
88
 
 
89
    def copy(self):
 
90
        """Return a clone of this retry manager"""
 
91
        obj = KazooRetry(self.max_tries, self.delay, self.backoff,
 
92
                         self.max_jitter / 100.0, self.max_delay,
 
93
                         self.sleep_func, deadline=self.deadline,
 
94
                         interrupt=self.interrupt)
 
95
        obj.retry_exceptions = self.retry_exceptions
 
96
        return obj
 
97
 
111
98
    def __call__(self, func, *args, **kwargs):
112
 
        self.retry_sleeper.reset()
 
99
        """Call a function with arguments until it completes without
 
100
        throwing a Kazoo exception
 
101
 
 
102
        :param func: Function to call
 
103
        :param args: Positional arguments to call the function with
 
104
        :params kwargs: Keyword arguments to call the function with
 
105
 
 
106
        The function will be called until it doesn't throw one of the
 
107
        retryable exceptions (ConnectionLoss, OperationTimeout, or
 
108
        ForceRetryError), and optionally retrying on session
 
109
        expiration.
 
110
 
 
111
        """
 
112
        self.reset()
113
113
 
114
114
        while True:
115
115
            try:
 
116
                if self.deadline is not None and self._cur_stoptime is None:
 
117
                    self._cur_stoptime = time.time() + self.deadline
116
118
                return func(*args, **kwargs)
117
119
            except ConnectionClosedError:
118
120
                raise
119
121
            except self.retry_exceptions:
120
 
                self.retry_sleeper.increment()
 
122
                if self._attempts == self.max_tries:
 
123
                    raise RetryFailedError("Too many retry attempts")
 
124
                self._attempts += 1
 
125
                sleeptime = self._cur_delay + (random.randint(0, self.max_jitter) / 100.0)
 
126
 
 
127
                if self._cur_stoptime is not None and time.time() + sleeptime >= self._cur_stoptime:
 
128
                    raise RetryFailedError("Exceeded retry deadline")
 
129
 
 
130
                if self.interrupt:
 
131
                    while sleeptime > 0:
 
132
                        # Break the time period down and sleep for no longer than
 
133
                        # 0.1 before calling the interrupt
 
134
                        if sleeptime < 0.1:
 
135
                            self.sleep_func(sleeptime)
 
136
                            sleeptime -= sleeptime
 
137
                        else:
 
138
                            self.sleep_func(0.1)
 
139
                            sleeptime -= 0.1
 
140
                        if self.interrupt():
 
141
                            raise InterruptedError()
 
142
                else:
 
143
                    self.sleep_func(sleeptime)
 
144
                self._cur_delay = min(self._cur_delay * self.backoff, self.max_delay)