1
from mock import patch, MagicMock
2
from subprocess import CalledProcessError
4
from deployer.utils import dict_merge, _check_call, ErrorExit
9
def test_relation_list_merge(self):
12
{'relations': [['m1', 'x1']]},
13
{'relations': [['m2', 'x2']]}),
14
{'relations': [['m1', 'x1'], ['m2', 'x2']]})
16
def test_no_rels_in_target(self):
20
{'relations': [['m1', 'x1'], ['m2', 'x2']]}),
21
{'a': 1, 'relations': [['m1', 'x1'], ['m2', 'x2']]})
23
@patch('subprocess.check_output')
24
def test_check_call_fail_no_retry(self, check_output):
25
_e = CalledProcessError(returncode=1, cmd=['fail'])
26
check_output.side_effect = _e
28
ErrorExit, _check_call, params=['fail'], log=MagicMock())
31
@patch('subprocess.check_output')
32
def test_check_call_fail_retry(self, check_output, sleep):
33
_e = CalledProcessError(returncode=1, cmd=['fail'])
34
check_output.side_effect = _e
36
ErrorExit, _check_call, params=['fail'], log=MagicMock(), max_retry=3)
37
# 1 failure + 3 retries
38
self.assertEquals(len(check_output.call_args_list), 4)
41
@patch('subprocess.check_output')
42
def test_check_call_succeed_after_retry(self, check_output, sleep):
43
# call succeeds after the 3rd try
44
_e = CalledProcessError(returncode=1, cmd=['maybe_fail'])
45
check_output.side_effect = [
47
output = _check_call(params=['magybe_fail'], log=MagicMock(), max_retry=3)
48
self.assertEquals(output, 'good')
49
# 1 failure + 3 retries
50
self.assertEquals(len(check_output.call_args_list), 3)