11
from unittest import TestCase
13
AZURE_PACKAGE = os.path.realpath(os.path.join(
14
os.path.dirname(__file__), 'azure-sdk-for-python-master'))
15
sys.path.insert(0, AZURE_PACKAGE)
16
from azure.servicemanagement import (
19
) # nopep8 We have to put code before imports in this case.
21
from winazure import (
29
ServiceManagementService,
35
class WinAzureTestCase(TestCase):
37
def test_main_delete_unused_disks(self):
38
with patch('winazure.delete_unused_disks', autospec=True) as d_mock:
39
with patch('winazure.ServiceManagementService',
40
autospec=True, return_value='sms') as sms_mock:
41
main(['-d', '-v', '-c' 'cert.pem', '-s', 'secret',
42
'delete-unused-disks'])
43
sms_mock.assert_called_once_with('secret', 'cert.pem')
44
d_mock.assert_called_once_with('sms', dry_run=True, verbose=True)
46
def test_main_list_services(self):
47
with patch('winazure.list_services', autospec=True) as l_mock:
48
with patch('winazure.ServiceManagementService',
49
autospec=True, return_value='sms') as sms_mock:
50
main(['-d', '-v', '-c' 'cert.pem', '-s', 'secret',
51
'list-services', 'juju-deploy*'])
52
sms_mock.assert_called_once_with('secret', 'cert.pem')
53
l_mock.assert_called_once_with(
54
'sms', glob='juju-deploy*', verbose=True)
56
def test_main_delete_services(self):
57
with patch('winazure.delete_services', autospec=True) as l_mock:
58
with patch('winazure.ServiceManagementService',
59
autospec=True, return_value='sms') as sms_mock:
60
main(['-d', '-v', '-c' 'cert.pem', '-s', 'secret',
61
'delete-services', '-o', '2', 'juju-deploy*'])
62
sms_mock.assert_called_once_with('secret', 'cert.pem')
63
l_mock.assert_called_once_with(
64
'sms', glob='juju-deploy*', old_age=2, verbose=True, dry_run=True)
66
def test_delete_unused_disks(self):
67
sms = ServiceManagementService('secret', 'cert.pem')
70
disk1.attached_to = None
73
disk2.attached_to.hosted_service_name = ''
76
disk3.attached_to.hosted_service_name = 'hs3'
77
disks = [disk1, disk2, disk3]
78
with patch.object(sms, 'list_disks', autospec=True,
79
return_value=disks) as ld_mock:
80
with patch.object(sms, 'delete_disk', autospec=True) as dd_mock:
81
delete_unused_disks(sms, dry_run=False, verbose=False)
82
ld_mock.assert_called_once_with()
83
dd_mock.assert_any_call('disk1', delete_vhd=True)
84
dd_mock.assert_any_call('disk2', delete_vhd=True)
85
self.assertEqual(2, dd_mock.call_count)
87
def test_list_services(self):
88
sms = ServiceManagementService('secret', 'cert.pem')
90
hs1.service_name = 'juju-upgrade-foo'
92
hs2.service_name = 'juju-deploy-foo'
94
with patch.object(sms, 'list_hosted_services', autospec=True,
95
return_value=services) as ls_mock:
96
services = list_services(sms, 'juju-deploy-*', verbose=False)
97
ls_mock.assert_called_once_with()
98
self.assertEqual([hs2], services)
100
def test_is_old_deployment(self):
101
now = datetime.utcnow()
102
ago = timedelta(hours=2)
104
d1.created_time = (now - timedelta(hours=3)).strftime(DATETIME_PATTERN)
106
is_old_deployment([d1], now, ago, verbose=False))
108
d2.created_time = (now - timedelta(hours=1)).strftime(DATETIME_PATTERN)
110
is_old_deployment([d2], now, ago, verbose=False))
112
is_old_deployment([d1, d2], now, ago, verbose=False))
114
def test_wait_for_success(self):
115
sms = ServiceManagementService('secret', 'cert.pem')
117
request.request_id = 'foo'
119
op1.status = 'Pending'
121
op2.status = SUCCEEDED
123
op3.status = 'Not Reachable'
124
with patch.object(sms, 'get_operation_status', autospec=True,
125
side_effect=[op1, op2, op3]) as gs_mock:
126
wait_for_success(sms, request, pause=0, verbose=False)
127
self.assertEqual(2, gs_mock.call_count)
129
def test_delete_service(self):
130
sms = ServiceManagementService('secret', 'cert.pem')
131
hosted_service = Mock(spec=HostedService)
132
hosted_service.service_name = 'juju-bar'
133
deployment = Mock(spec=Deployment)
134
deployment.name = 'juju-bar-2'
135
with patch.object(sms, 'delete_deployment',
136
return_value='request') as dd_mock:
137
with patch('winazure.wait_for_success', autospec=True) as ws_mock:
138
with patch.object(sms, 'delete_hosted_service',
139
autospec=True) as ds_mock:
141
sms, hosted_service, [deployment],
142
pause=0, dry_run=False, verbose=False)
143
dd_mock.assert_called_once_with('juju-bar', 'juju-bar-2')
144
ws_mock.assert_called_once_with(sms, 'request', pause=0, verbose=False)
145
ds_mock.assert_any_call('juju-bar')
147
def test_delete_services(self):
148
sms = ServiceManagementService('secret', 'cert.pem')
149
hs1 = Mock(service_name='juju-foo')
150
hs2 = Mock(service_name='juju-bar')
151
d1 = Mock(name='juju-foo-1')
152
p1 = Mock(deployments=[d1])
153
d2 = Mock(name='juju-bar-2')
154
p2 = Mock(deployments=[d2])
155
with patch('winazure.list_services', autospec=True,
156
return_value=[hs1, hs2]) as ls_mock:
157
with patch.object(sms, 'get_hosted_service_properties',
158
side_effect=[p1, p2]) as gs_mock:
159
with patch('winazure.is_old_deployment', autospec=True,
160
side_effect=[False, True]) as od_mock:
161
with patch('winazure.delete_service',
162
autospec=True) as ds_mock:
163
delete_services(sms, glob='juju-*', old_age=2,
164
pause=0, dry_run=False, verbose=False)
165
ls_mock.assert_called_once_with(sms, glob='juju-*', verbose=False)
166
gs_mock.assert_any_call('juju-foo', embed_detail=True)
167
gs_mock.assert_any_call('juju-bar', embed_detail=True)
168
self.assertEqual(2, od_mock.call_count)
169
args, kwargs = od_mock.call_args
170
self.assertIs(p2.deployments, args[0])
171
self.assertIsInstance(args[1], datetime)
172
self.assertEqual(2 * 60 * 60, args[2].seconds)
173
ds_mock.assert_called_once_with(
174
sms, hs2, [d2], pause=0, dry_run=False, verbose=False)