25
27
from oslo.config import cfg
27
from cinder.tests.windows import basetestcase
30
from mox import IgnoreArg
31
from mox import stubout
33
from cinder import test
35
from cinder.image import image_utils
28
37
from cinder.tests.windows import db_fakes
29
from cinder.tests.windows import windowsutils
30
from cinder.volume.drivers import windows
38
from cinder.volume import configuration as conf
39
from cinder.volume.drivers.windows import windows
40
from cinder.volume.drivers.windows import windows_utils
36
class TestWindowsDriver(basetestcase.BaseTestCase):
46
class TestWindowsDriver(test.TestCase):
38
48
def __init__(self, method):
39
49
super(TestWindowsDriver, self).__init__(method)
52
self.lun_path_tempdir = tempfile.mkdtemp()
42
53
super(TestWindowsDriver, self).setUp()
54
self._mox = mox_lib.Mox()
55
self.stubs = stubout.StubOutForTesting()
44
windows_iscsi_lun_path='C:\iSCSIVirtualDisks',
57
windows_iscsi_lun_path=self.lun_path_tempdir,
46
self._volume_data = None
47
self._volume_data_2 = None
48
self._snapshot_data = None
49
self._connector_data = None
50
self._volume_id = '10958016-e196-42e3-9e7f-5d8927ae3099'
51
self._volume_id_2 = '20958016-e196-42e3-9e7f-5d8927ae3098'
52
self._snapshot_id = '30958016-e196-42e3-9e7f-5d8927ae3097'
53
self._iqn = "iqn.1991-05.com.microsoft:dell1160dsy"
55
59
self._setup_stubs()
57
self._drv = windows.WindowsDriver()
58
self._drv.do_setup({})
59
self._wutils = windowsutils.WindowsUtils()
60
configuration = conf.Configuration(None)
61
configuration.append_config_values(windows.windows_opts)
63
self._driver = windows.WindowsDriver(configuration=configuration)
64
self._driver.do_setup({})
67
self._mox.UnsetStubs()
69
shutil.rmtree(self.lun_path_tempdir)
70
super(TestWindowsDriver, self).tearDown()
61
72
def _setup_stubs(self):
77
self._inject_mocks_in_modules(modules_to_mock, modules_to_test)
81
if (self._volume_data_2 and
82
self._wutils.volume_exists(self._volume_data_2['name'])):
83
self._wutils.delete_volume(self._volume_data_2['name'])
85
if (self._volume_data and
86
self._wutils.volume_exists(
87
self._volume_data['name'])):
88
self._wutils.delete_volume(self._volume_data['name'])
89
if (self._snapshot_data and
90
self._wutils.snapshot_exists(
91
self._snapshot_data['name'])):
92
self._wutils.delete_snapshot(self._snapshot_data['name'])
93
if (self._connector_data and
94
self._wutils.initiator_id_exists(
95
"%s%s" % (CONF.iscsi_target_prefix,
96
self._volume_data['name']),
97
self._connector_data['initiator'])):
98
target_name = "%s%s" % (CONF.iscsi_target_prefix,
99
self._volume_data['name'])
100
initiator_name = self._connector_data['initiator']
101
self._wutils.delete_initiator_id(target_name, initiator_name)
102
if (self._volume_data and
103
self._wutils.export_exists("%s%s" %
104
(CONF.iscsi_target_prefix,
105
self._volume_data['name']))):
106
self._wutils.delete_export(
107
"%s%s" % (CONF.iscsi_target_prefix,
108
self._volume_data['name']))
111
super(TestWindowsDriver, self).tearDown()
74
def fake_wutils__init__(self):
76
windows_utils.WindowsUtils.__init__ = fake_wutils__init__
78
def fake_local_path(self, volume):
79
return os.path.join(CONF.windows_iscsi_lun_path,
80
str(volume['name']) + ".vhd")
113
82
def test_check_for_setup_errors(self):
114
self._drv.check_for_setup_error()
85
self._mox.StubOutWithMock(windows_utils.WindowsUtils,
86
'check_for_setup_error')
87
windows_utils.WindowsUtils.check_for_setup_error()
91
drv.check_for_setup_error()
116
95
def test_create_volume(self):
117
self._create_volume()
119
wt_disks = self._wutils.find_vhd_by_name(self._volume_data['name'])
120
self.assertEquals(len(wt_disks), 1)
122
def _create_volume(self):
123
self._volume_data = db_fakes.get_fake_volume_info(self._volume_id)
124
self._drv.create_volume(self._volume_data)
98
vol = db_fakes.get_fake_volume_info()
100
self.stubs.Set(drv, 'local_path', self.fake_local_path)
102
self._mox.StubOutWithMock(windows_utils.WindowsUtils,
105
windows_utils.WindowsUtils.create_volume(self.fake_local_path(vol),
106
vol['name'], vol['size'])
110
drv.create_volume(vol)
126
114
def test_delete_volume(self):
127
self._create_volume()
129
self._drv.delete_volume(self._volume_data)
131
wt_disks = self._wutils.find_vhd_by_name(self._volume_data['name'])
132
self.assertEquals(len(wt_disks), 0)
115
"""delete_volume simple test case."""
119
vol = db_fakes.get_fake_volume_info()
121
mox.StubOutWithMock(drv, 'local_path')
122
drv.local_path(vol).AndReturn(self.fake_local_path(vol))
124
self._mox.StubOutWithMock(windows_utils.WindowsUtils,
126
windows_utils.WindowsUtils.delete_volume(vol['name'],
127
self.fake_local_path(vol))
130
drv.delete_volume(vol)
134
134
def test_create_snapshot(self):
136
self._create_volume()
138
wt_disks = self._wutils.find_vhd_by_name(self._volume_data['name'])
139
self.assertEquals(len(wt_disks), 1)
140
#Create a snapshot from the previous volume
141
self._create_snapshot()
143
snapshot_name = self._snapshot_data['name']
144
wt_snapshots = self._wutils.find_snapshot_by_name(snapshot_name)
145
self.assertEquals(len(wt_snapshots), 1)
147
def _create_snapshot(self):
148
volume_name = self._volume_data['name']
149
snapshot_id = self._snapshot_id
150
self._snapshot_data = db_fakes.get_fake_snapshot_info(volume_name,
152
self._drv.create_snapshot(self._snapshot_data)
137
self._mox.StubOutWithMock(windows_utils.WindowsUtils,
139
volume = db_fakes.get_fake_volume_info()
140
snapshot = db_fakes.get_fake_snapshot_info()
142
self.stubs.Set(drv, 'local_path', self.fake_local_path(snapshot))
144
windows_utils.WindowsUtils.create_snapshot(volume['name'],
149
drv.create_snapshot(snapshot)
154
153
def test_create_volume_from_snapshot(self):
156
self._create_volume()
157
#Create a snapshot from the previous volume
158
self._create_snapshot()
160
self._volume_data_2 = db_fakes.get_fake_volume_info(self._volume_id_2)
162
self._drv.create_volume_from_snapshot(self._volume_data_2,
165
wt_disks = self._wutils.find_vhd_by_name(self._volume_data_2['name'])
166
self.assertEquals(len(wt_disks), 1)
157
snapshot = db_fakes.get_fake_snapshot_info()
158
volume = db_fakes.get_fake_volume_info()
160
self._mox.StubOutWithMock(windows_utils.WindowsUtils,
161
'create_volume_from_snapshot')
162
windows_utils.WindowsUtils.\
163
create_volume_from_snapshot(volume['name'], snapshot['name'])
167
drv.create_volume_from_snapshot(volume, snapshot)
168
171
def test_delete_snapshot(self):
170
self._create_volume()
171
#Create a snapshot from the previous volume
172
self._create_snapshot()
174
self._drv.delete_snapshot(self._snapshot_data)
176
snapshot_name = self._snapshot_data['name']
177
wt_snapshots = self._wutils.find_snapshot_by_name(snapshot_name)
178
self.assertEquals(len(wt_snapshots), 0)
175
snapshot = db_fakes.get_fake_snapshot_info()
177
self._mox.StubOutWithMock(windows_utils.WindowsUtils,
179
windows_utils.WindowsUtils.delete_snapshot(snapshot['name'])
183
drv.delete_snapshot(snapshot)
180
187
def test_create_export(self):
182
self._create_volume()
184
retval = self._drv.create_export({}, self._volume_data)
186
volume_name = self._volume_data['name']
189
{'provider_location': "%s%s" % (CONF.iscsi_target_prefix,
191
volume = db_fakes.get_fake_volume_info()
193
initiator_name = "%s%s" % (CONF.iscsi_target_prefix, volume['name'])
195
self._mox.StubOutWithMock(windows_utils.WindowsUtils,
196
'create_iscsi_target')
197
windows_utils.WindowsUtils.create_iscsi_target(initiator_name,
199
self._mox.StubOutWithMock(windows_utils.WindowsUtils,
200
'add_disk_to_target')
201
windows_utils.WindowsUtils.add_disk_to_target(volume['name'],
206
export_info = drv.create_export(None, volume)
210
self.assertEquals(export_info['provider_location'], initiator_name)
192
212
def test_initialize_connection(self):
194
self._create_volume()
196
self._drv.create_export({}, self._volume_data)
198
self._connector_data = db_fakes.get_fake_connector_info(self._iqn)
200
init_data = self._drv.initialize_connection(self._volume_data,
201
self._connector_data)
202
target_name = self._volume_data['provider_location']
203
initiator_name = self._connector_data['initiator']
205
wt_initiator_ids = self._wutils.find_initiator_ids(target_name,
207
self.assertEquals(len(wt_initiator_ids), 1)
209
properties = init_data['data']
210
self.assertNotEqual(properties['target_iqn'], None)
216
volume = db_fakes.get_fake_volume_info()
217
initiator_name = "%s%s" % (CONF.iscsi_target_prefix, volume['name'])
219
connector = db_fakes.get_fake_connector_info()
221
self._mox.StubOutWithMock(windows_utils.WindowsUtils,
222
'associate_initiator_with_iscsi_target')
223
windows_utils.WindowsUtils.associate_initiator_with_iscsi_target(
224
volume['provider_location'], initiator_name, )
226
self._mox.StubOutWithMock(windows_utils.WindowsUtils,
227
'get_host_information')
228
windows_utils.WindowsUtils.get_host_information(
229
volume, volume['provider_location'])
233
drv.initialize_connection(volume, connector)
237
def test_terminate_connection(self):
241
volume = db_fakes.get_fake_volume_info()
242
initiator_name = "%s%s" % (CONF.iscsi_target_prefix, volume['name'])
243
connector = db_fakes.get_fake_connector_info()
245
self._mox.StubOutWithMock(windows_utils.WindowsUtils,
246
'delete_iscsi_target')
247
windows_utils.WindowsUtils.delete_iscsi_target(
248
initiator_name, volume['provider_location'])
252
drv.terminate_connection(volume, connector)
212
256
def test_ensure_export(self):
214
self._create_volume()
216
self._drv.ensure_export({}, self._volume_data)
260
volume = db_fakes.get_fake_volume_info()
262
initiator_name = "%s%s" % (CONF.iscsi_target_prefix, volume['name'])
264
self._mox.StubOutWithMock(windows_utils.WindowsUtils,
265
'create_iscsi_target')
266
windows_utils.WindowsUtils.create_iscsi_target(initiator_name, True)
267
self._mox.StubOutWithMock(windows_utils.WindowsUtils,
268
'add_disk_to_target')
269
windows_utils.WindowsUtils.add_disk_to_target(volume['name'],
274
drv.ensure_export(None, volume)
218
278
def test_remove_export(self):
220
self._create_volume()
222
self._drv.create_export({}, self._volume_data)
224
self._drv.remove_export({}, self._volume_data)
282
volume = db_fakes.get_fake_volume_info()
284
target_name = volume['provider_location']
286
self._mox.StubOutWithMock(windows_utils.WindowsUtils,
287
'remove_iscsi_target')
288
windows_utils.WindowsUtils.remove_iscsi_target(target_name)
292
drv.remove_export(None, volume)
296
def test_copy_image_to_volume(self):
297
"""resize_image common case usage."""
301
volume = db_fakes.get_fake_volume_info()
303
self.stubs.Set(drv, 'local_path', self.fake_local_path)
305
mox.StubOutWithMock(image_utils, 'fetch_to_vhd')
306
image_utils.fetch_to_vhd(None, None, None,
307
self.fake_local_path(volume))
311
drv.copy_image_to_volume(None, volume, None, None)
315
def test_copy_volume_to_image(self):
319
vol = db_fakes.get_fake_volume_info()
321
image_meta = db_fakes.get_fake_image_meta()
323
self.stubs.Set(drv, 'local_path', self.fake_local_path)
325
mox.StubOutWithMock(image_utils, 'upload_volume')
327
temp_vhd_path = os.path.join(CONF.image_conversion_dir,
328
str(image_meta['id']) + ".vhd")
330
image_utils.upload_volume(None, None, image_meta, temp_vhd_path, 'vpc')
332
self._mox.StubOutWithMock(windows_utils.WindowsUtils,
335
windows_utils.WindowsUtils.copy_vhd_disk(self.fake_local_path(vol),
340
drv.copy_volume_to_image(None, vol, None, image_meta)
344
def test_create_cloned_volume(self):
348
volume = db_fakes.get_fake_volume_info()
349
volume_cloned = db_fakes.get_fake_volume_info_cloned()
351
self._mox.StubOutWithMock(windows_utils.WindowsUtils,
354
windows_utils.WindowsUtils.create_volume(IgnoreArg(), IgnoreArg(),
357
self._mox.StubOutWithMock(windows_utils.WindowsUtils,
359
windows_utils.WindowsUtils.copy_vhd_disk(self.fake_local_path(
360
volume_cloned), self.fake_local_path(volume))
364
drv.create_cloned_volume(volume, volume_cloned)
368
def test_extend_volume(self):
372
volume = db_fakes.get_fake_volume_info()
374
TEST_VOLUME_ADDITIONAL_SIZE_MB = 1024
375
TEST_VOLUME_ADDITIONAL_SIZE_GB = 1
377
self._mox.StubOutWithMock(windows_utils.WindowsUtils, 'extend')
379
windows_utils.WindowsUtils.extend(volume['name'],
380
TEST_VOLUME_ADDITIONAL_SIZE_MB)
382
new_size = volume['size'] + TEST_VOLUME_ADDITIONAL_SIZE_GB
386
drv.extend_volume(volume, new_size)