~ubuntu-branches/ubuntu/raring/nova/raring-proposed

« back to all changes in this revision

Viewing changes to nova/tests/test_netapp_nfs.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short, Adam Gandelman, Chuck Short
  • Date: 2012-11-23 09:04:58 UTC
  • mfrom: (1.1.66)
  • Revision ID: package-import@ubuntu.com-20121123090458-91565o7aev1i1h71
Tags: 2013.1~g1-0ubuntu1
[ Adam Gandelman ]
* debian/control: Ensure novaclient is upgraded with nova,
  require python-keystoneclient >= 1:2.9.0. (LP: #1073289)
* debian/patches/{ubuntu/*, rbd-security.patch}: Dropped, applied
  upstream.
* debian/control: Add python-testtools to Build-Depends.

[ Chuck Short ]
* New upstream version.
* Refreshed debian/patches/avoid_setuptools_git_dependency.patch.
* debian/rules: FTBFS if missing binaries.
* debian/nova-scheudler.install: Add missing rabbit-queues and
  nova-rpc-zmq-receiver.
* Remove nova-volume since it doesnt exist anymore, transition to cinder-*.
* debian/rules: install apport hook in the right place.
* debian/patches/ubuntu-show-tests.patch: Display test failures.
* debian/control: Add depends on genisoimage
* debian/control: Suggest guestmount.
* debian/control: Suggest websockify. (LP: #1076442)
* debian/nova.conf: Disable nova-volume service.
* debian/control: Depend on xen-system-* rather than the hypervisor.
* debian/control, debian/mans/nova-conductor.8, debian/nova-conductor.init,
  debian/nova-conductor.install, debian/nova-conductor.logrotate
  debian/nova-conductor.manpages, debian/nova-conductor.postrm
  debian/nova-conductor.upstart.in: Add nova-conductor service.
* debian/control: Add python-fixtures as a build deps.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# vim: tabstop=4 shiftwidth=4 softtabstop=4
2
 
 
3
 
# Copyright (c) 2012 NetApp, Inc.
4
 
# All Rights Reserved.
5
 
#
6
 
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
7
 
#    not use this file except in compliance with the License. You may obtain
8
 
#    a copy of the License at
9
 
#
10
 
#         http://www.apache.org/licenses/LICENSE-2.0
11
 
#
12
 
#    Unless required by applicable law or agreed to in writing, software
13
 
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14
 
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15
 
#    License for the specific language governing permissions and limitations
16
 
#    under the License.
17
 
"""Unit tests for the NetApp-specific NFS driver module (netapp_nfs)"""
18
 
 
19
 
from nova import context
20
 
from nova import exception
21
 
from nova import test
22
 
 
23
 
from nova.volume import netapp
24
 
from nova.volume import netapp_nfs
25
 
from nova.volume import nfs
26
 
 
27
 
from mox import IgnoreArg
28
 
from mox import IsA
29
 
from mox import MockObject
30
 
 
31
 
import mox
32
 
import suds
33
 
import types
34
 
 
35
 
 
36
 
class FakeVolume(object):
37
 
    def __init__(self, size=0):
38
 
        self.size = size
39
 
        self.id = hash(self)
40
 
        self.name = None
41
 
 
42
 
    def __getitem__(self, key):
43
 
        return self.__dict__[key]
44
 
 
45
 
 
46
 
class FakeSnapshot(object):
47
 
    def __init__(self, volume_size=0):
48
 
        self.volume_name = None
49
 
        self.name = None
50
 
        self.volume_id = None
51
 
        self.volume_size = volume_size
52
 
        self.user_id = None
53
 
        self.status = None
54
 
 
55
 
    def __getitem__(self, key):
56
 
        return self.__dict__[key]
57
 
 
58
 
 
59
 
class FakeResponce(object):
60
 
    def __init__(self, status):
61
 
        """
62
 
        :param status: Either 'failed' or 'passed'
63
 
        """
64
 
        self.Status = status
65
 
 
66
 
        if status == 'failed':
67
 
            self.Reason = 'Sample error'
68
 
 
69
 
 
70
 
class NetappNfsDriverTestCase(test.TestCase):
71
 
    """Test case for NetApp specific NFS clone driver"""
72
 
 
73
 
    def setUp(self):
74
 
        self._driver = netapp_nfs.NetAppNFSDriver()
75
 
        self._mox = mox.Mox()
76
 
 
77
 
    def tearDown(self):
78
 
        self._mox.UnsetStubs()
79
 
 
80
 
    def test_check_for_setup_error(self):
81
 
        mox = self._mox
82
 
        drv = self._driver
83
 
        required_flags = [
84
 
                'netapp_wsdl_url',
85
 
                'netapp_login',
86
 
                'netapp_password',
87
 
                'netapp_server_hostname',
88
 
                'netapp_server_port'
89
 
            ]
90
 
 
91
 
        # check exception raises when flags are not set
92
 
        self.assertRaises(exception.NovaException,
93
 
                          drv.check_for_setup_error)
94
 
 
95
 
        # set required flags
96
 
        for flag in required_flags:
97
 
            setattr(netapp.FLAGS, flag, 'val')
98
 
 
99
 
        mox.StubOutWithMock(nfs.NfsDriver, 'check_for_setup_error')
100
 
        nfs.NfsDriver.check_for_setup_error()
101
 
        mox.ReplayAll()
102
 
 
103
 
        drv.check_for_setup_error()
104
 
 
105
 
        mox.VerifyAll()
106
 
 
107
 
        # restore initial FLAGS
108
 
        for flag in required_flags:
109
 
            delattr(netapp.FLAGS, flag)
110
 
 
111
 
    def test_do_setup(self):
112
 
        mox = self._mox
113
 
        drv = self._driver
114
 
 
115
 
        mox.StubOutWithMock(drv, 'check_for_setup_error')
116
 
        mox.StubOutWithMock(netapp_nfs.NetAppNFSDriver, '_get_client')
117
 
 
118
 
        drv.check_for_setup_error()
119
 
        netapp_nfs.NetAppNFSDriver._get_client()
120
 
 
121
 
        mox.ReplayAll()
122
 
 
123
 
        drv.do_setup(IsA(context.RequestContext))
124
 
 
125
 
        mox.VerifyAll()
126
 
 
127
 
    def test_create_snapshot(self):
128
 
        """Test snapshot can be created and deleted"""
129
 
        mox = self._mox
130
 
        drv = self._driver
131
 
 
132
 
        mox.StubOutWithMock(drv, '_clone_volume')
133
 
        drv._clone_volume(IgnoreArg(), IgnoreArg(), IgnoreArg())
134
 
        mox.ReplayAll()
135
 
 
136
 
        drv.create_snapshot(FakeSnapshot())
137
 
 
138
 
        mox.VerifyAll()
139
 
 
140
 
    def test_create_volume_from_snapshot(self):
141
 
        """Tests volume creation from snapshot"""
142
 
        drv = self._driver
143
 
        mox = self._mox
144
 
        volume = FakeVolume(1)
145
 
        snapshot = FakeSnapshot(2)
146
 
 
147
 
        self.assertRaises(exception.NovaException,
148
 
                          drv.create_volume_from_snapshot,
149
 
                          volume,
150
 
                          snapshot)
151
 
 
152
 
        snapshot = FakeSnapshot(1)
153
 
 
154
 
        location = '127.0.0.1:/nfs'
155
 
        expected_result = {'provider_location': location}
156
 
        mox.StubOutWithMock(drv, '_clone_volume')
157
 
        mox.StubOutWithMock(drv, '_get_volume_location')
158
 
        drv._clone_volume(IgnoreArg(), IgnoreArg(), IgnoreArg())
159
 
        drv._get_volume_location(IgnoreArg()).AndReturn(location)
160
 
 
161
 
        mox.ReplayAll()
162
 
 
163
 
        loc = drv.create_volume_from_snapshot(volume, snapshot)
164
 
 
165
 
        self.assertEquals(loc, expected_result)
166
 
 
167
 
        mox.VerifyAll()
168
 
 
169
 
    def _prepare_delete_snapshot_mock(self, snapshot_exists):
170
 
        drv = self._driver
171
 
        mox = self._mox
172
 
 
173
 
        mox.StubOutWithMock(drv, '_get_provider_location')
174
 
        mox.StubOutWithMock(drv, '_volume_not_present')
175
 
 
176
 
        if snapshot_exists:
177
 
            mox.StubOutWithMock(drv, '_execute')
178
 
            mox.StubOutWithMock(drv, '_get_volume_path')
179
 
 
180
 
        drv._get_provider_location(IgnoreArg())
181
 
        drv._volume_not_present(IgnoreArg(), IgnoreArg())\
182
 
                                        .AndReturn(not snapshot_exists)
183
 
 
184
 
        if snapshot_exists:
185
 
            drv._get_volume_path(IgnoreArg(), IgnoreArg())
186
 
            drv._execute('rm', None, run_as_root=True)
187
 
 
188
 
        mox.ReplayAll()
189
 
 
190
 
        return mox
191
 
 
192
 
    def test_delete_existing_snapshot(self):
193
 
        drv = self._driver
194
 
        mox = self._prepare_delete_snapshot_mock(True)
195
 
 
196
 
        drv.delete_snapshot(FakeSnapshot())
197
 
 
198
 
        mox.VerifyAll()
199
 
 
200
 
    def test_delete_missing_snapshot(self):
201
 
        drv = self._driver
202
 
        mox = self._prepare_delete_snapshot_mock(False)
203
 
 
204
 
        drv.delete_snapshot(FakeSnapshot())
205
 
 
206
 
        mox.VerifyAll()
207
 
 
208
 
    def _prepare_clone_mock(self, status):
209
 
        drv = self._driver
210
 
        mox = self._mox
211
 
 
212
 
        volume = FakeVolume()
213
 
        setattr(volume, 'provider_location', '127.0.0.1:/nfs')
214
 
 
215
 
        drv._client = MockObject(suds.client.Client)
216
 
        drv._client.factory = MockObject(suds.client.Factory)
217
 
        drv._client.service = MockObject(suds.client.ServiceSelector)
218
 
 
219
 
        # ApiProxy() method is generated by ServiceSelector at runtime from the
220
 
        # XML, so mocking is impossible.
221
 
        setattr(drv._client.service,
222
 
                'ApiProxy',
223
 
                types.MethodType(lambda *args, **kwargs: FakeResponce(status),
224
 
                                 suds.client.ServiceSelector))
225
 
        mox.StubOutWithMock(drv, '_get_host_id')
226
 
        mox.StubOutWithMock(drv, '_get_full_export_path')
227
 
 
228
 
        drv._get_host_id(IgnoreArg()).AndReturn('10')
229
 
        drv._get_full_export_path(IgnoreArg(), IgnoreArg()).AndReturn('/nfs')
230
 
 
231
 
        return mox
232
 
 
233
 
    def test_successfull_clone_volume(self):
234
 
        drv = self._driver
235
 
        mox = self._prepare_clone_mock('passed')
236
 
 
237
 
        mox.ReplayAll()
238
 
 
239
 
        volume_name = 'volume_name'
240
 
        clone_name = 'clone_name'
241
 
        volume_id = volume_name + str(hash(volume_name))
242
 
 
243
 
        drv._clone_volume(volume_name, clone_name, volume_id)
244
 
 
245
 
        mox.VerifyAll()
246
 
 
247
 
    def test_failed_clone_volume(self):
248
 
        drv = self._driver
249
 
        mox = self._prepare_clone_mock('failed')
250
 
 
251
 
        mox.ReplayAll()
252
 
 
253
 
        volume_name = 'volume_name'
254
 
        clone_name = 'clone_name'
255
 
        volume_id = volume_name + str(hash(volume_name))
256
 
 
257
 
        self.assertRaises(exception.NovaException,
258
 
                          drv._clone_volume,
259
 
                          volume_name, clone_name, volume_id)
260
 
 
261
 
        mox.VerifyAll()