~ubuntu-branches/ubuntu/utopic/kazoo/utopic-proposed

« back to all changes in this revision

Viewing changes to kazoo/tests/test_retry.py

  • Committer: Package Import Robot
  • Author(s): Neil Williams
  • Date: 2013-08-26 06:26:20 UTC
  • mfrom: (1.1.1)
  • Revision ID: package-import@ubuntu.com-20130826062620-dv79eayvq78028jb
Tags: 1.2.1-1
* New upstream release.
* Fix sphinx documentation build on clean systems (added
  python-gevent to Build-Depends and patched config).
* Bumped standards version to 3.9.4. No changes needed.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
import unittest
2
2
 
3
3
from nose.tools import eq_
4
 
from nose.tools import raises
5
4
 
6
5
 
7
6
class TestRetrySleeper(unittest.TestCase):
8
7
 
 
8
    def _pass(self):
 
9
        pass
 
10
 
 
11
    def _fail(self, times=1):
 
12
        from kazoo.retry import ForceRetryError
 
13
        scope = dict(times=0)
 
14
 
 
15
        def inner():
 
16
            if scope['times'] >= times:
 
17
                pass
 
18
            else:
 
19
                scope['times'] += 1
 
20
                raise ForceRetryError('Failed!')
 
21
        return inner
 
22
 
9
23
    def _makeOne(self, *args, **kwargs):
10
 
        from kazoo.retry import RetrySleeper
11
 
        return RetrySleeper(*args, **kwargs)
 
24
        from kazoo.retry import KazooRetry
 
25
        return KazooRetry(*args, **kwargs)
12
26
 
13
27
    def test_reset(self):
14
 
        retry = self._makeOne(delay=0)
15
 
        retry.increment()
 
28
        retry = self._makeOne(delay=0, max_tries=2)
 
29
        retry(self._fail())
16
30
        eq_(retry._attempts, 1)
17
31
        retry.reset()
18
32
        eq_(retry._attempts, 0)
19
33
 
20
34
    def test_too_many_tries(self):
 
35
        from kazoo.retry import RetryFailedError
21
36
        retry = self._makeOne(delay=0)
22
 
        retry.increment()
23
 
 
24
 
        @raises(Exception)
25
 
        def testit():
26
 
            retry.increment()
27
 
        testit()
 
37
        self.assertRaises(RetryFailedError, retry, self._fail(times=999))
 
38
        eq_(retry._attempts, 1)
28
39
 
29
40
    def test_maximum_delay(self):
30
 
        def sleep_func(time):
 
41
        def sleep_func(_time):
31
42
            pass
32
43
 
33
44
        retry = self._makeOne(delay=10, max_tries=100, sleep_func=sleep_func)
34
 
        for i in range(10):
35
 
            retry.increment()
 
45
        retry(self._fail(times=10))
36
46
        self.assertTrue(retry._cur_delay < 4000, retry._cur_delay)
37
47
        # gevent's sleep function is picky about the type
38
48
        eq_(type(retry._cur_delay), float)