~ubuntu-branches/ubuntu/utopic/cinder/utopic

« back to all changes in this revision

Viewing changes to cinder/tests/test_windows.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short, James Page, Adam Gandelman, Chuck Short
  • Date: 2013-09-08 21:09:46 UTC
  • mfrom: (1.1.18)
  • Revision ID: package-import@ubuntu.com-20130908210946-3dbzq1jy5uji4wad
Tags: 1:2013.2~b3-0ubuntu1
[ James Page ]
* d/control: Switch ceph-common -> python-ceph inline with upstream
  refactoring of Ceph RBD driver, move to Suggests of python-cinder.
  (LP: #1190791). 

[ Adam Gandelman ]
* debian/patches/avoid_paramiko_vers_depends.patch: Dropped, no longer
  required.
* Add minimum requirement python-greenlet (>= 0.3.2).
* Add minimum requirement python-eventlet (>= 0.12.0).
* Add minimum requirement python-paramiko (>= 1.8).

[ Chuck Short ]
* New upstream release.
* debian/patches/skip-sqlachemy-failures.patch: Skip testfailures
  with sqlalchemy 0.8 until they are fixed upstream.
* debian/control: Add python-babel to build-depends.
* debian/control: Add python-novaclient to build-depends.

Show diffs side-by-side

added added

removed removed

Lines of Context:
20
20
"""
21
21
 
22
22
 
23
 
import sys
 
23
import os
 
24
import shutil
 
25
import tempfile
24
26
 
25
27
from oslo.config import cfg
26
28
 
27
 
from cinder.tests.windows import basetestcase
 
29
import mox as mox_lib
 
30
from mox import IgnoreArg
 
31
from mox import stubout
 
32
 
 
33
from cinder import test
 
34
 
 
35
from cinder.image import image_utils
 
36
 
28
37
from cinder.tests.windows import db_fakes
29
 
from cinder.tests.windows import windowsutils
30
 
from cinder.volume.drivers import windows
 
38
from cinder.volume import configuration as conf
 
39
from cinder.volume.drivers.windows import windows
 
40
from cinder.volume.drivers.windows import windows_utils
31
41
 
32
42
 
33
43
CONF = cfg.CONF
34
44
 
35
45
 
36
 
class TestWindowsDriver(basetestcase.BaseTestCase):
 
46
class TestWindowsDriver(test.TestCase):
37
47
 
38
48
    def __init__(self, method):
39
49
        super(TestWindowsDriver, self).__init__(method)
40
50
 
41
51
    def setUp(self):
 
52
        self.lun_path_tempdir = tempfile.mkdtemp()
42
53
        super(TestWindowsDriver, self).setUp()
 
54
        self._mox = mox_lib.Mox()
 
55
        self.stubs = stubout.StubOutForTesting()
43
56
        self.flags(
44
 
            windows_iscsi_lun_path='C:\iSCSIVirtualDisks',
 
57
            windows_iscsi_lun_path=self.lun_path_tempdir,
45
58
        )
46
 
        self._volume_data = None
47
 
        self._volume_data_2 = None
48
 
        self._snapshot_data = None
49
 
        self._connector_data = None
50
 
        self._volume_id = '10958016-e196-42e3-9e7f-5d8927ae3099'
51
 
        self._volume_id_2 = '20958016-e196-42e3-9e7f-5d8927ae3098'
52
 
        self._snapshot_id = '30958016-e196-42e3-9e7f-5d8927ae3097'
53
 
        self._iqn = "iqn.1991-05.com.microsoft:dell1160dsy"
54
 
 
55
59
        self._setup_stubs()
56
 
 
57
 
        self._drv = windows.WindowsDriver()
58
 
        self._drv.do_setup({})
59
 
        self._wutils = windowsutils.WindowsUtils()
 
60
        configuration = conf.Configuration(None)
 
61
        configuration.append_config_values(windows.windows_opts)
 
62
 
 
63
        self._driver = windows.WindowsDriver(configuration=configuration)
 
64
        self._driver.do_setup({})
 
65
 
 
66
    def tearDown(self):
 
67
        self._mox.UnsetStubs()
 
68
        self.stubs.UnsetAll()
 
69
        shutil.rmtree(self.lun_path_tempdir)
 
70
        super(TestWindowsDriver, self).tearDown()
60
71
 
61
72
    def _setup_stubs(self):
62
73
 
63
 
        # Modules to mock
64
 
        modules_to_mock = [
65
 
            'wmi',
66
 
            'os',
67
 
            'subprocess',
68
 
            'multiprocessing'
69
 
        ]
70
 
 
71
 
        modules_to_test = [
72
 
            windows,
73
 
            windowsutils,
74
 
            sys.modules[__name__]
75
 
        ]
76
 
 
77
 
        self._inject_mocks_in_modules(modules_to_mock, modules_to_test)
78
 
 
79
 
    def tearDown(self):
80
 
        try:
81
 
            if (self._volume_data_2 and
82
 
                    self._wutils.volume_exists(self._volume_data_2['name'])):
83
 
                self._wutils.delete_volume(self._volume_data_2['name'])
84
 
 
85
 
            if (self._volume_data and
86
 
                    self._wutils.volume_exists(
87
 
                        self._volume_data['name'])):
88
 
                self._wutils.delete_volume(self._volume_data['name'])
89
 
            if (self._snapshot_data and
90
 
                    self._wutils.snapshot_exists(
91
 
                        self._snapshot_data['name'])):
92
 
                self._wutils.delete_snapshot(self._snapshot_data['name'])
93
 
            if (self._connector_data and
94
 
                    self._wutils.initiator_id_exists(
95
 
                        "%s%s" % (CONF.iscsi_target_prefix,
96
 
                                  self._volume_data['name']),
97
 
                        self._connector_data['initiator'])):
98
 
                target_name = "%s%s" % (CONF.iscsi_target_prefix,
99
 
                                        self._volume_data['name'])
100
 
                initiator_name = self._connector_data['initiator']
101
 
                self._wutils.delete_initiator_id(target_name, initiator_name)
102
 
            if (self._volume_data and
103
 
                    self._wutils.export_exists("%s%s" %
104
 
                                               (CONF.iscsi_target_prefix,
105
 
                                                self._volume_data['name']))):
106
 
                self._wutils.delete_export(
107
 
                    "%s%s" % (CONF.iscsi_target_prefix,
108
 
                              self._volume_data['name']))
109
 
 
110
 
        finally:
111
 
            super(TestWindowsDriver, self).tearDown()
 
74
        def fake_wutils__init__(self):
 
75
            pass
 
76
        windows_utils.WindowsUtils.__init__ = fake_wutils__init__
 
77
 
 
78
    def fake_local_path(self, volume):
 
79
            return os.path.join(CONF.windows_iscsi_lun_path,
 
80
                                str(volume['name']) + ".vhd")
112
81
 
113
82
    def test_check_for_setup_errors(self):
114
 
        self._drv.check_for_setup_error()
 
83
        mox = self._mox
 
84
        drv = self._driver
 
85
        self._mox.StubOutWithMock(windows_utils.WindowsUtils,
 
86
                                  'check_for_setup_error')
 
87
        windows_utils.WindowsUtils.check_for_setup_error()
 
88
 
 
89
        mox.ReplayAll()
 
90
 
 
91
        drv.check_for_setup_error()
 
92
 
 
93
        mox.VerifyAll()
115
94
 
116
95
    def test_create_volume(self):
117
 
        self._create_volume()
118
 
 
119
 
        wt_disks = self._wutils.find_vhd_by_name(self._volume_data['name'])
120
 
        self.assertEquals(len(wt_disks), 1)
121
 
 
122
 
    def _create_volume(self):
123
 
        self._volume_data = db_fakes.get_fake_volume_info(self._volume_id)
124
 
        self._drv.create_volume(self._volume_data)
 
96
        mox = self._mox
 
97
        drv = self._driver
 
98
        vol = db_fakes.get_fake_volume_info()
 
99
 
 
100
        self.stubs.Set(drv, 'local_path', self.fake_local_path)
 
101
 
 
102
        self._mox.StubOutWithMock(windows_utils.WindowsUtils,
 
103
                                  'create_volume')
 
104
 
 
105
        windows_utils.WindowsUtils.create_volume(self.fake_local_path(vol),
 
106
                                                 vol['name'], vol['size'])
 
107
 
 
108
        mox.ReplayAll()
 
109
 
 
110
        drv.create_volume(vol)
 
111
 
 
112
        mox.VerifyAll()
125
113
 
126
114
    def test_delete_volume(self):
127
 
        self._create_volume()
128
 
 
129
 
        self._drv.delete_volume(self._volume_data)
130
 
 
131
 
        wt_disks = self._wutils.find_vhd_by_name(self._volume_data['name'])
132
 
        self.assertEquals(len(wt_disks), 0)
 
115
        """delete_volume simple test case."""
 
116
        mox = self._mox
 
117
        drv = self._driver
 
118
 
 
119
        vol = db_fakes.get_fake_volume_info()
 
120
 
 
121
        mox.StubOutWithMock(drv, 'local_path')
 
122
        drv.local_path(vol).AndReturn(self.fake_local_path(vol))
 
123
 
 
124
        self._mox.StubOutWithMock(windows_utils.WindowsUtils,
 
125
                                  'delete_volume')
 
126
        windows_utils.WindowsUtils.delete_volume(vol['name'],
 
127
                                                 self.fake_local_path(vol))
 
128
        mox.ReplayAll()
 
129
 
 
130
        drv.delete_volume(vol)
 
131
 
 
132
        mox.VerifyAll()
133
133
 
134
134
    def test_create_snapshot(self):
135
 
        #Create a volume
136
 
        self._create_volume()
137
 
 
138
 
        wt_disks = self._wutils.find_vhd_by_name(self._volume_data['name'])
139
 
        self.assertEquals(len(wt_disks), 1)
140
 
        #Create a snapshot from the previous volume
141
 
        self._create_snapshot()
142
 
 
143
 
        snapshot_name = self._snapshot_data['name']
144
 
        wt_snapshots = self._wutils.find_snapshot_by_name(snapshot_name)
145
 
        self.assertEquals(len(wt_snapshots), 1)
146
 
 
147
 
    def _create_snapshot(self):
148
 
        volume_name = self._volume_data['name']
149
 
        snapshot_id = self._snapshot_id
150
 
        self._snapshot_data = db_fakes.get_fake_snapshot_info(volume_name,
151
 
                                                              snapshot_id)
152
 
        self._drv.create_snapshot(self._snapshot_data)
 
135
        mox = self._mox
 
136
        drv = self._driver
 
137
        self._mox.StubOutWithMock(windows_utils.WindowsUtils,
 
138
                                  'create_snapshot')
 
139
        volume = db_fakes.get_fake_volume_info()
 
140
        snapshot = db_fakes.get_fake_snapshot_info()
 
141
 
 
142
        self.stubs.Set(drv, 'local_path', self.fake_local_path(snapshot))
 
143
 
 
144
        windows_utils.WindowsUtils.create_snapshot(volume['name'],
 
145
                                                   snapshot['name'])
 
146
 
 
147
        mox.ReplayAll()
 
148
 
 
149
        drv.create_snapshot(snapshot)
 
150
 
 
151
        mox.VerifyAll()
153
152
 
154
153
    def test_create_volume_from_snapshot(self):
155
 
        #Create a volume
156
 
        self._create_volume()
157
 
        #Create a snapshot from the previous volume
158
 
        self._create_snapshot()
159
 
 
160
 
        self._volume_data_2 = db_fakes.get_fake_volume_info(self._volume_id_2)
161
 
 
162
 
        self._drv.create_volume_from_snapshot(self._volume_data_2,
163
 
                                              self._snapshot_data)
164
 
 
165
 
        wt_disks = self._wutils.find_vhd_by_name(self._volume_data_2['name'])
166
 
        self.assertEquals(len(wt_disks), 1)
 
154
        mox = self._mox
 
155
        drv = self._driver
 
156
 
 
157
        snapshot = db_fakes.get_fake_snapshot_info()
 
158
        volume = db_fakes.get_fake_volume_info()
 
159
 
 
160
        self._mox.StubOutWithMock(windows_utils.WindowsUtils,
 
161
                                  'create_volume_from_snapshot')
 
162
        windows_utils.WindowsUtils.\
 
163
            create_volume_from_snapshot(volume['name'], snapshot['name'])
 
164
 
 
165
        mox.ReplayAll()
 
166
 
 
167
        drv.create_volume_from_snapshot(volume, snapshot)
 
168
 
 
169
        mox.VerifyAll()
167
170
 
168
171
    def test_delete_snapshot(self):
169
 
        #Create a volume
170
 
        self._create_volume()
171
 
        #Create a snapshot from the previous volume
172
 
        self._create_snapshot()
173
 
 
174
 
        self._drv.delete_snapshot(self._snapshot_data)
175
 
 
176
 
        snapshot_name = self._snapshot_data['name']
177
 
        wt_snapshots = self._wutils.find_snapshot_by_name(snapshot_name)
178
 
        self.assertEquals(len(wt_snapshots), 0)
 
172
        mox = self._mox
 
173
        drv = self._driver
 
174
 
 
175
        snapshot = db_fakes.get_fake_snapshot_info()
 
176
 
 
177
        self._mox.StubOutWithMock(windows_utils.WindowsUtils,
 
178
                                  'delete_snapshot')
 
179
        windows_utils.WindowsUtils.delete_snapshot(snapshot['name'])
 
180
 
 
181
        mox.ReplayAll()
 
182
 
 
183
        drv.delete_snapshot(snapshot)
 
184
 
 
185
        mox.VerifyAll()
179
186
 
180
187
    def test_create_export(self):
181
 
        #Create a volume
182
 
        self._create_volume()
183
 
 
184
 
        retval = self._drv.create_export({}, self._volume_data)
185
 
 
186
 
        volume_name = self._volume_data['name']
187
 
        self.assertEquals(
188
 
            retval,
189
 
            {'provider_location': "%s%s" % (CONF.iscsi_target_prefix,
190
 
                                            volume_name)})
 
188
        mox = self._mox
 
189
        drv = self._driver
 
190
 
 
191
        volume = db_fakes.get_fake_volume_info()
 
192
 
 
193
        initiator_name = "%s%s" % (CONF.iscsi_target_prefix, volume['name'])
 
194
 
 
195
        self._mox.StubOutWithMock(windows_utils.WindowsUtils,
 
196
                                  'create_iscsi_target')
 
197
        windows_utils.WindowsUtils.create_iscsi_target(initiator_name,
 
198
                                                       IgnoreArg())
 
199
        self._mox.StubOutWithMock(windows_utils.WindowsUtils,
 
200
                                  'add_disk_to_target')
 
201
        windows_utils.WindowsUtils.add_disk_to_target(volume['name'],
 
202
                                                      initiator_name)
 
203
 
 
204
        mox.ReplayAll()
 
205
 
 
206
        export_info = drv.create_export(None, volume)
 
207
 
 
208
        mox.VerifyAll()
 
209
 
 
210
        self.assertEquals(export_info['provider_location'], initiator_name)
191
211
 
192
212
    def test_initialize_connection(self):
193
 
        #Create a volume
194
 
        self._create_volume()
195
 
 
196
 
        self._drv.create_export({}, self._volume_data)
197
 
 
198
 
        self._connector_data = db_fakes.get_fake_connector_info(self._iqn)
199
 
 
200
 
        init_data = self._drv.initialize_connection(self._volume_data,
201
 
                                                    self._connector_data)
202
 
        target_name = self._volume_data['provider_location']
203
 
        initiator_name = self._connector_data['initiator']
204
 
 
205
 
        wt_initiator_ids = self._wutils.find_initiator_ids(target_name,
206
 
                                                           initiator_name)
207
 
        self.assertEquals(len(wt_initiator_ids), 1)
208
 
 
209
 
        properties = init_data['data']
210
 
        self.assertNotEqual(properties['target_iqn'], None)
 
213
        mox = self._mox
 
214
        drv = self._driver
 
215
 
 
216
        volume = db_fakes.get_fake_volume_info()
 
217
        initiator_name = "%s%s" % (CONF.iscsi_target_prefix, volume['name'])
 
218
 
 
219
        connector = db_fakes.get_fake_connector_info()
 
220
 
 
221
        self._mox.StubOutWithMock(windows_utils.WindowsUtils,
 
222
                                  'associate_initiator_with_iscsi_target')
 
223
        windows_utils.WindowsUtils.associate_initiator_with_iscsi_target(
 
224
            volume['provider_location'], initiator_name, )
 
225
 
 
226
        self._mox.StubOutWithMock(windows_utils.WindowsUtils,
 
227
                                  'get_host_information')
 
228
        windows_utils.WindowsUtils.get_host_information(
 
229
            volume, volume['provider_location'])
 
230
 
 
231
        mox.ReplayAll()
 
232
 
 
233
        drv.initialize_connection(volume, connector)
 
234
 
 
235
        mox.VerifyAll()
 
236
 
 
237
    def test_terminate_connection(self):
 
238
        mox = self._mox
 
239
        drv = self._driver
 
240
 
 
241
        volume = db_fakes.get_fake_volume_info()
 
242
        initiator_name = "%s%s" % (CONF.iscsi_target_prefix, volume['name'])
 
243
        connector = db_fakes.get_fake_connector_info()
 
244
 
 
245
        self._mox.StubOutWithMock(windows_utils.WindowsUtils,
 
246
                                  'delete_iscsi_target')
 
247
        windows_utils.WindowsUtils.delete_iscsi_target(
 
248
            initiator_name, volume['provider_location'])
 
249
 
 
250
        mox.ReplayAll()
 
251
 
 
252
        drv.terminate_connection(volume, connector)
 
253
 
 
254
        mox.VerifyAll()
211
255
 
212
256
    def test_ensure_export(self):
213
 
        #Create a volume
214
 
        self._create_volume()
215
 
 
216
 
        self._drv.ensure_export({}, self._volume_data)
 
257
        mox = self._mox
 
258
        drv = self._driver
 
259
 
 
260
        volume = db_fakes.get_fake_volume_info()
 
261
 
 
262
        initiator_name = "%s%s" % (CONF.iscsi_target_prefix, volume['name'])
 
263
 
 
264
        self._mox.StubOutWithMock(windows_utils.WindowsUtils,
 
265
                                  'create_iscsi_target')
 
266
        windows_utils.WindowsUtils.create_iscsi_target(initiator_name, True)
 
267
        self._mox.StubOutWithMock(windows_utils.WindowsUtils,
 
268
                                  'add_disk_to_target')
 
269
        windows_utils.WindowsUtils.add_disk_to_target(volume['name'],
 
270
                                                      initiator_name)
 
271
 
 
272
        mox.ReplayAll()
 
273
 
 
274
        drv.ensure_export(None, volume)
 
275
 
 
276
        mox.VerifyAll()
217
277
 
218
278
    def test_remove_export(self):
219
 
        #Create a volume
220
 
        self._create_volume()
221
 
 
222
 
        self._drv.create_export({}, self._volume_data)
223
 
 
224
 
        self._drv.remove_export({}, self._volume_data)
 
279
        mox = self._mox
 
280
        drv = self._driver
 
281
 
 
282
        volume = db_fakes.get_fake_volume_info()
 
283
 
 
284
        target_name = volume['provider_location']
 
285
 
 
286
        self._mox.StubOutWithMock(windows_utils.WindowsUtils,
 
287
                                  'remove_iscsi_target')
 
288
        windows_utils.WindowsUtils.remove_iscsi_target(target_name)
 
289
 
 
290
        mox.ReplayAll()
 
291
 
 
292
        drv.remove_export(None, volume)
 
293
 
 
294
        mox.VerifyAll()
 
295
 
 
296
    def test_copy_image_to_volume(self):
 
297
        """resize_image common case usage."""
 
298
        mox = self._mox
 
299
        drv = self._driver
 
300
 
 
301
        volume = db_fakes.get_fake_volume_info()
 
302
 
 
303
        self.stubs.Set(drv, 'local_path', self.fake_local_path)
 
304
 
 
305
        mox.StubOutWithMock(image_utils, 'fetch_to_vhd')
 
306
        image_utils.fetch_to_vhd(None, None, None,
 
307
                                 self.fake_local_path(volume))
 
308
 
 
309
        mox.ReplayAll()
 
310
 
 
311
        drv.copy_image_to_volume(None, volume, None, None)
 
312
 
 
313
        mox.VerifyAll()
 
314
 
 
315
    def test_copy_volume_to_image(self):
 
316
        mox = self._mox
 
317
        drv = self._driver
 
318
 
 
319
        vol = db_fakes.get_fake_volume_info()
 
320
 
 
321
        image_meta = db_fakes.get_fake_image_meta()
 
322
 
 
323
        self.stubs.Set(drv, 'local_path', self.fake_local_path)
 
324
 
 
325
        mox.StubOutWithMock(image_utils, 'upload_volume')
 
326
 
 
327
        temp_vhd_path = os.path.join(CONF.image_conversion_dir,
 
328
                                     str(image_meta['id']) + ".vhd")
 
329
 
 
330
        image_utils.upload_volume(None, None, image_meta, temp_vhd_path, 'vpc')
 
331
 
 
332
        self._mox.StubOutWithMock(windows_utils.WindowsUtils,
 
333
                                  'copy_vhd_disk')
 
334
 
 
335
        windows_utils.WindowsUtils.copy_vhd_disk(self.fake_local_path(vol),
 
336
                                                 temp_vhd_path)
 
337
 
 
338
        mox.ReplayAll()
 
339
 
 
340
        drv.copy_volume_to_image(None, vol, None, image_meta)
 
341
 
 
342
        mox.VerifyAll()
 
343
 
 
344
    def test_create_cloned_volume(self):
 
345
        mox = self._mox
 
346
        drv = self._driver
 
347
 
 
348
        volume = db_fakes.get_fake_volume_info()
 
349
        volume_cloned = db_fakes.get_fake_volume_info_cloned()
 
350
 
 
351
        self._mox.StubOutWithMock(windows_utils.WindowsUtils,
 
352
                                  'create_volume')
 
353
 
 
354
        windows_utils.WindowsUtils.create_volume(IgnoreArg(), IgnoreArg(),
 
355
                                                 IgnoreArg())
 
356
 
 
357
        self._mox.StubOutWithMock(windows_utils.WindowsUtils,
 
358
                                  'copy_vhd_disk')
 
359
        windows_utils.WindowsUtils.copy_vhd_disk(self.fake_local_path(
 
360
            volume_cloned), self.fake_local_path(volume))
 
361
 
 
362
        mox.ReplayAll()
 
363
 
 
364
        drv.create_cloned_volume(volume, volume_cloned)
 
365
 
 
366
        mox.VerifyAll()
 
367
 
 
368
    def test_extend_volume(self):
 
369
        mox = self._mox
 
370
        drv = self._driver
 
371
 
 
372
        volume = db_fakes.get_fake_volume_info()
 
373
 
 
374
        TEST_VOLUME_ADDITIONAL_SIZE_MB = 1024
 
375
        TEST_VOLUME_ADDITIONAL_SIZE_GB = 1
 
376
 
 
377
        self._mox.StubOutWithMock(windows_utils.WindowsUtils, 'extend')
 
378
 
 
379
        windows_utils.WindowsUtils.extend(volume['name'],
 
380
                                          TEST_VOLUME_ADDITIONAL_SIZE_MB)
 
381
 
 
382
        new_size = volume['size'] + TEST_VOLUME_ADDITIONAL_SIZE_GB
 
383
 
 
384
        mox.ReplayAll()
 
385
 
 
386
        drv.extend_volume(volume, new_size)
 
387
 
 
388
        mox.VerifyAll()