~harlowja/cloud-init/ds-openstack

« back to all changes in this revision

Viewing changes to tests/unittests/test_datasource/test_smartos.py

  • Committer: Joshua Harlow
  • Date: 2014-02-07 23:14:26 UTC
  • mfrom: (913.1.22 trunk)
  • Revision ID: harlowja@yahoo-inc.com-20140207231426-2natgf64l4o055du
Remerged with trunk

Show diffs side-by-side

added added

removed removed

Lines of Context:
27
27
from cloudinit.sources import DataSourceSmartOS
28
28
 
29
29
from mocker import MockerTestCase
 
30
import os
 
31
import os.path
 
32
import re
 
33
import stat
30
34
import uuid
31
35
 
32
36
MOCK_RETURNS = {
35
39
    'disable_iptables_flag': None,
36
40
    'enable_motd_sys_info': None,
37
41
    'test-var1': 'some data',
38
 
    'user-data': '\n'.join(['#!/bin/sh', '/bin/true', '']),
 
42
    'cloud-init:user-data': '\n'.join(['#!/bin/sh', '/bin/true', '']),
 
43
    'sdc:datacenter_name': 'somewhere2',
 
44
    'sdc:operator-script': '\n'.join(['bin/true', '']),
 
45
    'user-data': '\n'.join(['something', '']),
 
46
    'user-script': '\n'.join(['/bin/true', '']),
39
47
}
40
48
 
41
49
DMI_DATA_RETURN = (str(uuid.uuid4()), 'smartdc')
101
109
    def setUp(self):
102
110
        # makeDir comes from MockerTestCase
103
111
        self.tmp = self.makeDir()
 
112
        self.legacy_user_d = self.makeDir()
104
113
 
105
114
        # patch cloud_dir, so our 'seed_dir' is guaranteed empty
106
115
        self.paths = helpers.Paths({'cloud_dir': self.tmp})
138
147
            sys_cfg['datasource'] = sys_cfg.get('datasource', {})
139
148
            sys_cfg['datasource']['SmartOS'] = ds_cfg
140
149
 
 
150
        self.apply_patches([(mod, 'LEGACY_USER_D', self.legacy_user_d)])
141
151
        self.apply_patches([(mod, 'get_serial', _get_serial)])
142
152
        self.apply_patches([(mod, 'dmi_data', _dmi_data)])
143
153
        dsrc = mod.DataSourceSmartOS(sys_cfg, distro=None,
194
204
        # metadata provided base64_all of true
195
205
        my_returns = MOCK_RETURNS.copy()
196
206
        my_returns['base64_all'] = "true"
197
 
        for k in ('hostname', 'user-data'):
 
207
        for k in ('hostname', 'cloud-init:user-data'):
198
208
            my_returns[k] = base64.b64encode(my_returns[k])
199
209
 
200
210
        dsrc = self._get_ds(mockdata=my_returns)
202
212
        self.assertTrue(ret)
203
213
        self.assertEquals(MOCK_RETURNS['hostname'],
204
214
                          dsrc.metadata['local-hostname'])
205
 
        self.assertEquals(MOCK_RETURNS['user-data'],
 
215
        self.assertEquals(MOCK_RETURNS['cloud-init:user-data'],
206
216
                          dsrc.userdata_raw)
207
217
        self.assertEquals(MOCK_RETURNS['root_authorized_keys'],
208
218
                          dsrc.metadata['public-keys'])
213
223
 
214
224
    def test_b64_userdata(self):
215
225
        my_returns = MOCK_RETURNS.copy()
216
 
        my_returns['b64-user-data'] = "true"
 
226
        my_returns['b64-cloud-init:user-data'] = "true"
217
227
        my_returns['b64-hostname'] = "true"
218
 
        for k in ('hostname', 'user-data'):
 
228
        for k in ('hostname', 'cloud-init:user-data'):
219
229
            my_returns[k] = base64.b64encode(my_returns[k])
220
230
 
221
231
        dsrc = self._get_ds(mockdata=my_returns)
223
233
        self.assertTrue(ret)
224
234
        self.assertEquals(MOCK_RETURNS['hostname'],
225
235
                          dsrc.metadata['local-hostname'])
226
 
        self.assertEquals(MOCK_RETURNS['user-data'], dsrc.userdata_raw)
 
236
        self.assertEquals(MOCK_RETURNS['cloud-init:user-data'],
 
237
                          dsrc.userdata_raw)
227
238
        self.assertEquals(MOCK_RETURNS['root_authorized_keys'],
228
239
                          dsrc.metadata['public-keys'])
229
240
 
238
249
        self.assertTrue(ret)
239
250
        self.assertEquals(MOCK_RETURNS['hostname'],
240
251
                          dsrc.metadata['local-hostname'])
241
 
        self.assertEquals(MOCK_RETURNS['user-data'], dsrc.userdata_raw)
 
252
        self.assertEquals(MOCK_RETURNS['cloud-init:user-data'],
 
253
                          dsrc.userdata_raw)
242
254
 
243
255
    def test_userdata(self):
244
256
        dsrc = self._get_ds(mockdata=MOCK_RETURNS)
245
257
        ret = dsrc.get_data()
246
258
        self.assertTrue(ret)
247
 
        self.assertEquals(MOCK_RETURNS['user-data'], dsrc.userdata_raw)
 
259
        self.assertEquals(MOCK_RETURNS['user-data'],
 
260
                          dsrc.metadata['legacy-user-data'])
 
261
        self.assertEquals(MOCK_RETURNS['cloud-init:user-data'],
 
262
                          dsrc.userdata_raw)
 
263
 
 
264
    def test_sdc_scripts(self):
 
265
        dsrc = self._get_ds(mockdata=MOCK_RETURNS)
 
266
        ret = dsrc.get_data()
 
267
        self.assertTrue(ret)
 
268
        self.assertEquals(MOCK_RETURNS['user-script'],
 
269
                          dsrc.metadata['user-script'])
 
270
 
 
271
        legacy_script_f = "%s/user-script" % self.legacy_user_d
 
272
        self.assertTrue(os.path.exists(legacy_script_f))
 
273
        self.assertTrue(os.path.islink(legacy_script_f))
 
274
        user_script_perm = oct(os.stat(legacy_script_f)[stat.ST_MODE])[-3:]
 
275
        self.assertEquals(user_script_perm, '700')
 
276
 
 
277
    def test_scripts_shebanged(self):
 
278
        dsrc = self._get_ds(mockdata=MOCK_RETURNS)
 
279
        ret = dsrc.get_data()
 
280
        self.assertTrue(ret)
 
281
        self.assertEquals(MOCK_RETURNS['user-script'],
 
282
                          dsrc.metadata['user-script'])
 
283
 
 
284
        legacy_script_f = "%s/user-script" % self.legacy_user_d
 
285
        self.assertTrue(os.path.exists(legacy_script_f))
 
286
        self.assertTrue(os.path.islink(legacy_script_f))
 
287
        shebang = None
 
288
        with open(legacy_script_f, 'r') as f:
 
289
            shebang = f.readlines()[0].strip()
 
290
        self.assertEquals(shebang, "#!/bin/bash")
 
291
        user_script_perm = oct(os.stat(legacy_script_f)[stat.ST_MODE])[-3:]
 
292
        self.assertEquals(user_script_perm, '700')
 
293
 
 
294
    def test_scripts_shebang_not_added(self):
 
295
        """
 
296
            Test that the SmartOS requirement that plain text scripts
 
297
            are executable. This test makes sure that plain texts scripts
 
298
            with out file magic have it added appropriately by cloud-init.
 
299
        """
 
300
 
 
301
        my_returns = MOCK_RETURNS.copy()
 
302
        my_returns['user-script'] = '\n'.join(['#!/usr/bin/perl',
 
303
                                               'print("hi")', ''])
 
304
 
 
305
        dsrc = self._get_ds(mockdata=my_returns)
 
306
        ret = dsrc.get_data()
 
307
        self.assertTrue(ret)
 
308
        self.assertEquals(my_returns['user-script'],
 
309
                          dsrc.metadata['user-script'])
 
310
 
 
311
        legacy_script_f = "%s/user-script" % self.legacy_user_d
 
312
        self.assertTrue(os.path.exists(legacy_script_f))
 
313
        self.assertTrue(os.path.islink(legacy_script_f))
 
314
        shebang = None
 
315
        with open(legacy_script_f, 'r') as f:
 
316
            shebang = f.readlines()[0].strip()
 
317
        self.assertEquals(shebang, "#!/usr/bin/perl")
 
318
 
 
319
    def test_scripts_removed(self):
 
320
        """
 
321
            Since SmartOS requires that the user script is fetched
 
322
            each boot, we want to make sure that the information
 
323
            is backed-up for user-review later.
 
324
 
 
325
            This tests the behavior of when a script is removed. It makes
 
326
            sure that a) the previous script is backed-up; and 2) that
 
327
            there is no script remaining.
 
328
        """
 
329
 
 
330
        script_d = os.path.join(self.tmp, "scripts", "per-boot")
 
331
        os.makedirs(script_d)
 
332
 
 
333
        test_script_f = "%s/99_user_script" % script_d
 
334
        with open(test_script_f, 'w') as f:
 
335
            f.write("TEST DATA")
 
336
 
 
337
        my_returns = MOCK_RETURNS.copy()
 
338
        del my_returns['user-script']
 
339
 
 
340
        dsrc = self._get_ds(mockdata=my_returns)
 
341
        ret = dsrc.get_data()
 
342
        self.assertTrue(ret)
 
343
        self.assertFalse(dsrc.metadata['user-script'])
 
344
        self.assertFalse(os.path.exists(test_script_f))
 
345
 
 
346
    def test_userdata_removed(self):
 
347
        """
 
348
            User-data in the SmartOS world is supposed to be written to a file
 
349
            each and every boot. This tests to make sure that in the event the
 
350
            legacy user-data is removed, the existing user-data is backed-up and
 
351
            there is no /var/db/user-data left.
 
352
        """
 
353
 
 
354
        user_data_f = "%s/mdata-user-data" % self.legacy_user_d
 
355
        with open(user_data_f, 'w') as f:
 
356
            f.write("PREVIOUS")
 
357
 
 
358
        my_returns = MOCK_RETURNS.copy()
 
359
        del my_returns['user-data']
 
360
 
 
361
        dsrc = self._get_ds(mockdata=my_returns)
 
362
        ret = dsrc.get_data()
 
363
        self.assertTrue(ret)
 
364
        self.assertFalse(dsrc.metadata.get('legacy-user-data'))
 
365
 
 
366
        found_new = False
 
367
        for root, _dirs, files in os.walk(self.legacy_user_d):
 
368
            for name in files:
 
369
                name_f = os.path.join(root, name)
 
370
                permissions = oct(os.stat(name_f)[stat.ST_MODE])[-3:]
 
371
                if re.match(r'.*\/mdata-user-data$', name_f):
 
372
                    found_new = True
 
373
                    print name_f
 
374
                    self.assertEquals(permissions, '400')
 
375
 
 
376
        self.assertFalse(found_new)
248
377
 
249
378
    def test_disable_iptables_flag(self):
250
379
        dsrc = self._get_ds(mockdata=MOCK_RETURNS)