~ubuntu-branches/ubuntu/saucy/cloud-init/saucy

« back to all changes in this revision

Viewing changes to tests/unittests/test_datasource/test_opennebula.py

  • Committer: Scott Moser
  • Date: 2013-09-11 21:04:19 UTC
  • mfrom: (1.4.5)
  • Revision ID: smoser@ubuntu.com-20130911210419-3vt5ze6ph3hu8dz1
* New upstream snapshot.
  * Add OpenNebula datasource.
  * Support reading 'random_seed' from metadata and writing to /dev/urandom
  * fix for bug in log_time.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
from cloudinit.sources import DataSourceOpenNebula as ds
 
2
from cloudinit import helpers
 
3
from cloudinit import util
 
4
from mocker import MockerTestCase
 
5
from tests.unittests.helpers import populate_dir
 
6
 
 
7
import os
 
8
import pwd
 
9
 
 
10
TEST_VARS = {
 
11
    'VAR1': 'single',
 
12
    'VAR2': 'double word',
 
13
    'VAR3': 'multi\nline\n',
 
14
    'VAR4': "'single'",
 
15
    'VAR5': "'double word'",
 
16
    'VAR6': "'multi\nline\n'",
 
17
    'VAR7': 'single\\t',
 
18
    'VAR8': 'double\\tword',
 
19
    'VAR9': 'multi\\t\nline\n',
 
20
    'VAR10': '\\',  # expect \
 
21
    'VAR11': '\'',  # expect '
 
22
    'VAR12': '$',   # expect $
 
23
}
 
24
 
 
25
INVALID_CONTEXT = ';'
 
26
USER_DATA = '#cloud-config\napt_upgrade: true'
 
27
SSH_KEY = 'ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460-%i'
 
28
HOSTNAME = 'foo.example.com'
 
29
PUBLIC_IP = '10.0.0.3'
 
30
 
 
31
CMD_IP_OUT = '''\
 
32
1: lo: <LOOPBACK,UP,LOWER_UP> mtu 16436 qdisc noqueue state UNKNOWN
 
33
    link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
 
34
2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc mq state UP qlen 1000
 
35
    link/ether 02:00:0a:12:01:01 brd ff:ff:ff:ff:ff:ff
 
36
'''
 
37
 
 
38
 
 
39
class TestOpenNebulaDataSource(MockerTestCase):
 
40
    parsed_user = None
 
41
 
 
42
    def setUp(self):
 
43
        super(TestOpenNebulaDataSource, self).setUp()
 
44
        self.tmp = self.makeDir()
 
45
        self.paths = helpers.Paths({'cloud_dir': self.tmp})
 
46
 
 
47
        # defaults for few tests
 
48
        self.ds = ds.DataSourceOpenNebula
 
49
        self.seed_dir = os.path.join(self.paths.seed_dir, "opennebula")
 
50
        self.sys_cfg = {'datasource': {'OpenNebula': {'dsmode': 'local'}}}
 
51
 
 
52
        # we don't want 'sudo' called in tests. so we patch switch_user_cmd
 
53
        def my_switch_user_cmd(user):
 
54
            self.parsed_user = user
 
55
            return []
 
56
 
 
57
        self.switch_user_cmd_real = ds.switch_user_cmd
 
58
        ds.switch_user_cmd = my_switch_user_cmd
 
59
 
 
60
    def tearDown(self):
 
61
        ds.switch_user_cmd = self.switch_user_cmd_real
 
62
        super(TestOpenNebulaDataSource, self).tearDown()
 
63
 
 
64
    def test_get_data_non_contextdisk(self):
 
65
        orig_find_devs_with = util.find_devs_with
 
66
        try:
 
67
            # dont' try to lookup for CDs
 
68
            util.find_devs_with = lambda n: []
 
69
            dsrc = self.ds(sys_cfg=self.sys_cfg, distro=None, paths=self.paths)
 
70
            ret = dsrc.get_data()
 
71
            self.assertFalse(ret)
 
72
        finally:
 
73
            util.find_devs_with = orig_find_devs_with
 
74
 
 
75
    def test_get_data_broken_contextdisk(self):
 
76
        orig_find_devs_with = util.find_devs_with
 
77
        try:
 
78
            # dont' try to lookup for CDs
 
79
            util.find_devs_with = lambda n: []
 
80
            populate_dir(self.seed_dir, {'context.sh': INVALID_CONTEXT})
 
81
            dsrc = self.ds(sys_cfg=self.sys_cfg, distro=None, paths=self.paths)
 
82
            self.assertRaises(ds.BrokenContextDiskDir, dsrc.get_data)
 
83
        finally:
 
84
            util.find_devs_with = orig_find_devs_with
 
85
 
 
86
    def test_get_data_invalid_identity(self):
 
87
        orig_find_devs_with = util.find_devs_with
 
88
        try:
 
89
            # generate non-existing system user name
 
90
            sys_cfg = self.sys_cfg
 
91
            invalid_user = 'invalid'
 
92
            while not sys_cfg['datasource']['OpenNebula'].get('parseuser'):
 
93
                try:
 
94
                    pwd.getpwnam(invalid_user)
 
95
                    invalid_user += 'X'
 
96
                except KeyError:
 
97
                    sys_cfg['datasource']['OpenNebula']['parseuser'] = \
 
98
                        invalid_user
 
99
 
 
100
            # dont' try to lookup for CDs
 
101
            util.find_devs_with = lambda n: []
 
102
            populate_context_dir(self.seed_dir, {'KEY1': 'val1'})
 
103
            dsrc = self.ds(sys_cfg=sys_cfg, distro=None, paths=self.paths)
 
104
            self.assertRaises(ds.BrokenContextDiskDir, dsrc.get_data)
 
105
        finally:
 
106
            util.find_devs_with = orig_find_devs_with
 
107
 
 
108
    def test_get_data(self):
 
109
        orig_find_devs_with = util.find_devs_with
 
110
        try:
 
111
            # dont' try to lookup for CDs
 
112
            util.find_devs_with = lambda n: []
 
113
            populate_context_dir(self.seed_dir, {'KEY1': 'val1'})
 
114
            dsrc = self.ds(sys_cfg=self.sys_cfg, distro=None, paths=self.paths)
 
115
            ret = dsrc.get_data()
 
116
            self.assertTrue(ret)
 
117
        finally:
 
118
            util.find_devs_with = orig_find_devs_with
 
119
 
 
120
    def test_seed_dir_non_contextdisk(self):
 
121
        self.assertRaises(ds.NonContextDiskDir, ds.read_context_disk_dir,
 
122
                          self.seed_dir)
 
123
 
 
124
    def test_seed_dir_empty1_context(self):
 
125
        populate_dir(self.seed_dir, {'context.sh': ''})
 
126
        results = ds.read_context_disk_dir(self.seed_dir)
 
127
 
 
128
        self.assertEqual(results['userdata'], None)
 
129
        self.assertEqual(results['metadata'], {})
 
130
 
 
131
    def test_seed_dir_empty2_context(self):
 
132
        populate_context_dir(self.seed_dir, {})
 
133
        results = ds.read_context_disk_dir(self.seed_dir)
 
134
 
 
135
        self.assertEqual(results['userdata'], None)
 
136
        self.assertEqual(results['metadata'], {})
 
137
 
 
138
    def test_seed_dir_broken_context(self):
 
139
        populate_dir(self.seed_dir, {'context.sh': INVALID_CONTEXT})
 
140
 
 
141
        self.assertRaises(ds.BrokenContextDiskDir,
 
142
                          ds.read_context_disk_dir,
 
143
                          self.seed_dir)
 
144
 
 
145
    def test_context_parser(self):
 
146
        populate_context_dir(self.seed_dir, TEST_VARS)
 
147
        results = ds.read_context_disk_dir(self.seed_dir)
 
148
 
 
149
        self.assertTrue('metadata' in results)
 
150
        self.assertEqual(TEST_VARS, results['metadata'])
 
151
 
 
152
    def test_ssh_key(self):
 
153
        public_keys = ['first key', 'second key']
 
154
        for c in range(4):
 
155
            for k in ('SSH_KEY', 'SSH_PUBLIC_KEY'):
 
156
                my_d = os.path.join(self.tmp, "%s-%i" % (k, c))
 
157
                populate_context_dir(my_d, {k: '\n'.join(public_keys)})
 
158
                results = ds.read_context_disk_dir(my_d)
 
159
 
 
160
                self.assertTrue('metadata' in results)
 
161
                self.assertTrue('public-keys' in results['metadata'])
 
162
                self.assertEqual(public_keys,
 
163
                                 results['metadata']['public-keys'])
 
164
 
 
165
            public_keys.append(SSH_KEY % (c + 1,))
 
166
 
 
167
    def test_user_data(self):
 
168
        for k in ('USER_DATA', 'USERDATA'):
 
169
            my_d = os.path.join(self.tmp, k)
 
170
            populate_context_dir(my_d, {k: USER_DATA})
 
171
            results = ds.read_context_disk_dir(my_d)
 
172
 
 
173
            self.assertTrue('userdata' in results)
 
174
            self.assertEqual(USER_DATA, results['userdata'])
 
175
 
 
176
    def test_hostname(self):
 
177
        for k in ('HOSTNAME', 'PUBLIC_IP', 'IP_PUBLIC', 'ETH0_IP'):
 
178
            my_d = os.path.join(self.tmp, k)
 
179
            populate_context_dir(my_d, {k: PUBLIC_IP})
 
180
            results = ds.read_context_disk_dir(my_d)
 
181
 
 
182
            self.assertTrue('metadata' in results)
 
183
            self.assertTrue('local-hostname' in results['metadata'])
 
184
            self.assertEqual(PUBLIC_IP, results['metadata']['local-hostname'])
 
185
 
 
186
    def test_network_interfaces(self):
 
187
        populate_context_dir(self.seed_dir, {'ETH0_IP': '1.2.3.4'})
 
188
        results = ds.read_context_disk_dir(self.seed_dir)
 
189
 
 
190
        self.assertTrue('network-interfaces' in results)
 
191
 
 
192
    def test_find_candidates(self):
 
193
        def my_devs_with(criteria):
 
194
            return {
 
195
                "LABEL=CONTEXT": ["/dev/sdb"],
 
196
                "LABEL=CDROM": ["/dev/sr0"],
 
197
                "TYPE=iso9660": ["/dev/vdb"],
 
198
            }.get(criteria, [])
 
199
 
 
200
        orig_find_devs_with = util.find_devs_with
 
201
        try:
 
202
            util.find_devs_with = my_devs_with
 
203
            self.assertEqual(["/dev/sdb", "/dev/sr0", "/dev/vdb"],
 
204
                             ds.find_candidate_devs())
 
205
        finally:
 
206
            util.find_devs_with = orig_find_devs_with
 
207
 
 
208
 
 
209
class TestOpenNebulaNetwork(MockerTestCase):
 
210
 
 
211
    def setUp(self):
 
212
        super(TestOpenNebulaNetwork, self).setUp()
 
213
 
 
214
    def test_lo(self):
 
215
        net = ds.OpenNebulaNetwork('', {})
 
216
        self.assertEqual(net.gen_conf(), u'''\
 
217
auto lo
 
218
iface lo inet loopback
 
219
''')
 
220
 
 
221
    def test_eth0(self):
 
222
        net = ds.OpenNebulaNetwork(CMD_IP_OUT, {})
 
223
        self.assertEqual(net.gen_conf(), u'''\
 
224
auto lo
 
225
iface lo inet loopback
 
226
 
 
227
auto eth0
 
228
iface eth0 inet static
 
229
  address 10.18.1.1
 
230
  network 10.18.1.0
 
231
  netmask 255.255.255.0
 
232
''')
 
233
 
 
234
    def test_eth0_override(self):
 
235
        context = {
 
236
            'DNS': '1.2.3.8',
 
237
            'ETH0_IP': '1.2.3.4',
 
238
            'ETH0_NETWORK': '1.2.3.0',
 
239
            'ETH0_MASK': '255.255.0.0',
 
240
            'ETH0_GATEWAY': '1.2.3.5',
 
241
            'ETH0_DOMAIN': 'example.com',
 
242
            'ETH0_DNS': '1.2.3.6 1.2.3.7'
 
243
        }
 
244
 
 
245
        net = ds.OpenNebulaNetwork(CMD_IP_OUT, context)
 
246
        self.assertEqual(net.gen_conf(), u'''\
 
247
auto lo
 
248
iface lo inet loopback
 
249
 
 
250
auto eth0
 
251
iface eth0 inet static
 
252
  address 1.2.3.4
 
253
  network 1.2.3.0
 
254
  netmask 255.255.0.0
 
255
  gateway 1.2.3.5
 
256
  dns-search example.com
 
257
  dns-nameservers 1.2.3.8 1.2.3.6 1.2.3.7
 
258
''')
 
259
 
 
260
 
 
261
def populate_context_dir(path, variables):
 
262
    data = "# Context variables generated by OpenNebula\n"
 
263
    for (k, v) in variables.iteritems():
 
264
        data += ("%s='%s'\n" % (k.upper(), v.replace(r"'", r"'\''")))
 
265
    populate_dir(path, {'context.sh': data})
 
266
 
 
267
# vi: ts=4 expandtab