~ubuntu-cloud-archive/ubuntu/precise/nova/trunk

« back to all changes in this revision

Viewing changes to nova/tests/test_rbd.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short, Adam Gandelman, Chuck Short, Vishvananda Ishaya
  • Date: 2012-09-20 07:45:50 UTC
  • mfrom: (1.1.62)
  • Revision ID: package-import@ubuntu.com-20120920074550-fzmmmzqcntnw1vu7
Tags: 2012.2~rc1-0ubuntu1
[ Adam Gandelman ]
* Ensure /etc/nova/rootwrap.d/ is only writable by root, ensure
  those permissions on /etc/nova/rootwrap.conf as well as
  all individual filter configurations.

[ Chuck Short ]
* Fix lintian warnings
* debian/*.lograote: compress logfiles when they are rotated. (LP:
  #1049915)
* debian/control: 
  - Suggest ceph-common for nova-volume.
  - Add python-cinderclient as a build depends.

[Vishvananda Ishaya]
* Split up vncproxy and xvpvncproxy.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
 
2
 
 
3
# Copyright 2012 Josh Durgin
 
4
# All Rights Reserved.
 
5
#
 
6
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
 
7
#    not use this file except in compliance with the License. You may obtain
 
8
#    a copy of the License at
 
9
#
 
10
#         http://www.apache.org/licenses/LICENSE-2.0
 
11
#
 
12
#    Unless required by applicable law or agreed to in writing, software
 
13
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 
14
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 
15
#    License for the specific language governing permissions and limitations
 
16
#    under the License.
 
17
 
 
18
from nova import db
 
19
from nova import exception
 
20
from nova.openstack.common import log as logging
 
21
from nova.openstack.common import timeutils
 
22
from nova import test
 
23
from nova.tests.image import fake as fake_image
 
24
from nova.tests.test_volume import DriverTestCase
 
25
from nova.volume.driver import RBDDriver
 
26
 
 
27
LOG = logging.getLogger(__name__)
 
28
 
 
29
 
 
30
class RBDTestCase(test.TestCase):
 
31
 
 
32
    def setUp(self):
 
33
        super(RBDTestCase, self).setUp()
 
34
 
 
35
        def fake_execute(*args):
 
36
            pass
 
37
        self.driver = RBDDriver(execute=fake_execute)
 
38
 
 
39
    def test_good_locations(self):
 
40
        locations = [
 
41
            'rbd://fsid/pool/image/snap',
 
42
            'rbd://%2F/%2F/%2F/%2F',
 
43
            ]
 
44
        map(self.driver._parse_location, locations)
 
45
 
 
46
    def test_bad_locations(self):
 
47
        locations = [
 
48
            'rbd://image',
 
49
            'http://path/to/somewhere/else',
 
50
            'rbd://image/extra',
 
51
            'rbd://image/',
 
52
            'rbd://fsid/pool/image/',
 
53
            'rbd://fsid/pool/image/snap/',
 
54
            'rbd://///',
 
55
            ]
 
56
        for loc in locations:
 
57
            self.assertRaises(exception.ImageUnacceptable,
 
58
                              self.driver._parse_location,
 
59
                              loc)
 
60
            self.assertFalse(self.driver._is_cloneable(loc))
 
61
 
 
62
    def test_cloneable(self):
 
63
        self.stubs.Set(self.driver, '_get_fsid', lambda: 'abc')
 
64
        location = 'rbd://abc/pool/image/snap'
 
65
        self.assertTrue(self.driver._is_cloneable(location))
 
66
 
 
67
    def test_uncloneable_different_fsid(self):
 
68
        self.stubs.Set(self.driver, '_get_fsid', lambda: 'abc')
 
69
        location = 'rbd://def/pool/image/snap'
 
70
        self.assertFalse(self.driver._is_cloneable(location))
 
71
 
 
72
    def test_uncloneable_unreadable(self):
 
73
        def fake_exc(*args):
 
74
            raise exception.ProcessExecutionError()
 
75
        self.stubs.Set(self.driver, '_get_fsid', lambda: 'abc')
 
76
        self.stubs.Set(self.driver, '_execute', fake_exc)
 
77
        location = 'rbd://abc/pool/image/snap'
 
78
        self.assertFalse(self.driver._is_cloneable(location))
 
79
 
 
80
 
 
81
class FakeRBDDriver(RBDDriver):
 
82
 
 
83
    def _clone(self):
 
84
        pass
 
85
 
 
86
    def _resize(self):
 
87
        pass
 
88
 
 
89
 
 
90
class ManagedRBDTestCase(DriverTestCase):
 
91
    driver_name = "nova.tests.test_rbd.FakeRBDDriver"
 
92
 
 
93
    def setUp(self):
 
94
        super(ManagedRBDTestCase, self).setUp()
 
95
        fake_image.stub_out_image_service(self.stubs)
 
96
 
 
97
    def _clone_volume_from_image(self, expected_status,
 
98
                                 clone_works=True):
 
99
        """Try to clone a volume from an image, and check the status
 
100
        afterwards"""
 
101
        def fake_clone_image(volume, image_location):
 
102
            pass
 
103
 
 
104
        def fake_clone_error(volume, image_location):
 
105
            raise exception.NovaException()
 
106
 
 
107
        self.stubs.Set(self.volume.driver, '_is_cloneable', lambda x: True)
 
108
        if clone_works:
 
109
            self.stubs.Set(self.volume.driver, 'clone_image', fake_clone_image)
 
110
        else:
 
111
            self.stubs.Set(self.volume.driver, 'clone_image', fake_clone_error)
 
112
 
 
113
        image_id = 'c905cedb-7281-47e4-8a62-f26bc5fc4c77'
 
114
        volume_id = 1
 
115
        # creating volume testdata
 
116
        db.volume_create(self.context, {'id': volume_id,
 
117
                            'updated_at': timeutils.utcnow(),
 
118
                            'display_description': 'Test Desc',
 
119
                            'size': 20,
 
120
                            'status': 'creating',
 
121
                            'instance_uuid': None,
 
122
                            'host': 'dummy'})
 
123
        try:
 
124
            if clone_works:
 
125
                self.volume.create_volume(self.context,
 
126
                                          volume_id,
 
127
                                          image_id=image_id)
 
128
            else:
 
129
                self.assertRaises(exception.NovaException,
 
130
                                  self.volume.create_volume,
 
131
                                  self.context,
 
132
                                  volume_id,
 
133
                                  image_id=image_id)
 
134
 
 
135
            volume = db.volume_get(self.context, volume_id)
 
136
            self.assertEqual(volume['status'], expected_status)
 
137
        finally:
 
138
            # cleanup
 
139
            db.volume_destroy(self.context, volume_id)
 
140
 
 
141
    def test_clone_image_status_available(self):
 
142
        """Verify that before cloning, an image is in the available state."""
 
143
        self._clone_volume_from_image('available', True)
 
144
 
 
145
    def test_clone_image_status_error(self):
 
146
        """Verify that before cloning, an image is in the available state."""
 
147
        self._clone_volume_from_image('error', False)
 
148
 
 
149
    def test_clone_success(self):
 
150
        self.stubs.Set(self.volume.driver, '_is_cloneable', lambda x: True)
 
151
        self.stubs.Set(self.volume.driver, 'clone_image', lambda a, b: True)
 
152
        image_id = 'c905cedb-7281-47e4-8a62-f26bc5fc4c77'
 
153
        self.assertTrue(self.volume.driver.clone_image({}, image_id))
 
154
 
 
155
    def test_clone_bad_image_id(self):
 
156
        self.stubs.Set(self.volume.driver, '_is_cloneable', lambda x: True)
 
157
        self.assertFalse(self.volume.driver.clone_image({}, None))
 
158
 
 
159
    def test_clone_uncloneable(self):
 
160
        self.stubs.Set(self.volume.driver, '_is_cloneable', lambda x: False)
 
161
        self.assertFalse(self.volume.driver.clone_image({}, 'dne'))