1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
3
# Copyright 2011 Justin Santa Barbara
5
# Licensed under the Apache License, Version 2.0 (the "License"); you may
6
# not use this file except in compliance with the License. You may obtain
7
# a copy of the License at
9
# http://www.apache.org/licenses/LICENSE-2.0
11
# Unless required by applicable law or agreed to in writing, software
12
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14
# License for the specific language governing permissions and limitations
28
from cinder import exception
29
from cinder import flags
30
from cinder.openstack.common import timeutils
31
from cinder import test
32
from cinder import utils
38
class ExecuteTestCase(test.TestCase):
39
def test_retry_on_failure(self):
40
fd, tmpfilename = tempfile.mkstemp()
41
_, tmpfilename2 = tempfile.mkstemp()
43
fp = os.fdopen(fd, 'w+')
45
# If stdin fails to get passed during one of the runs, make a note.
50
# If stdin has failed to get passed during this or a previous run, exit early.
65
os.chmod(tmpfilename, 0755)
66
self.assertRaises(exception.ProcessExecutionError,
68
tmpfilename, tmpfilename2, attempts=10,
71
fp = open(tmpfilename2, 'r+')
74
self.assertNotEquals(runs.strip(), 'failure', 'stdin did not '
77
runs = int(runs.strip())
78
self.assertEquals(runs, 10,
79
'Ran %d times instead of 10.' % (runs,))
81
os.unlink(tmpfilename)
82
os.unlink(tmpfilename2)
84
def test_unknown_kwargs_raises_error(self):
85
self.assertRaises(exception.Error,
87
'/usr/bin/env', 'true',
88
this_is_not_a_valid_kwarg=True)
90
def test_check_exit_code_boolean(self):
91
utils.execute('/usr/bin/env', 'false', check_exit_code=False)
92
self.assertRaises(exception.ProcessExecutionError,
94
'/usr/bin/env', 'false', check_exit_code=True)
96
def test_no_retry_on_success(self):
97
fd, tmpfilename = tempfile.mkstemp()
98
_, tmpfilename2 = tempfile.mkstemp()
100
fp = os.fdopen(fd, 'w+')
101
fp.write('''#!/bin/sh
102
# If we've already run, bail out.
103
grep -q foo "$1" && exit 1
104
# Mark that we've run before.
106
# Check that stdin gets passed correctly.
110
os.chmod(tmpfilename, 0755)
111
utils.execute(tmpfilename,
116
os.unlink(tmpfilename)
117
os.unlink(tmpfilename2)
120
class GetFromPathTestCase(test.TestCase):
121
def test_tolerates_nones(self):
122
f = utils.get_from_path
125
self.assertEquals([], f(input, "a"))
126
self.assertEquals([], f(input, "a/b"))
127
self.assertEquals([], f(input, "a/b/c"))
130
self.assertEquals([], f(input, "a"))
131
self.assertEquals([], f(input, "a/b"))
132
self.assertEquals([], f(input, "a/b/c"))
134
input = [{'a': None}]
135
self.assertEquals([], f(input, "a"))
136
self.assertEquals([], f(input, "a/b"))
137
self.assertEquals([], f(input, "a/b/c"))
139
input = [{'a': {'b': None}}]
140
self.assertEquals([{'b': None}], f(input, "a"))
141
self.assertEquals([], f(input, "a/b"))
142
self.assertEquals([], f(input, "a/b/c"))
144
input = [{'a': {'b': {'c': None}}}]
145
self.assertEquals([{'b': {'c': None}}], f(input, "a"))
146
self.assertEquals([{'c': None}], f(input, "a/b"))
147
self.assertEquals([], f(input, "a/b/c"))
149
input = [{'a': {'b': {'c': None}}}, {'a': None}]
150
self.assertEquals([{'b': {'c': None}}], f(input, "a"))
151
self.assertEquals([{'c': None}], f(input, "a/b"))
152
self.assertEquals([], f(input, "a/b/c"))
154
input = [{'a': {'b': {'c': None}}}, {'a': {'b': None}}]
155
self.assertEquals([{'b': {'c': None}}, {'b': None}], f(input, "a"))
156
self.assertEquals([{'c': None}], f(input, "a/b"))
157
self.assertEquals([], f(input, "a/b/c"))
159
def test_does_select(self):
160
f = utils.get_from_path
162
input = [{'a': 'a_1'}]
163
self.assertEquals(['a_1'], f(input, "a"))
164
self.assertEquals([], f(input, "a/b"))
165
self.assertEquals([], f(input, "a/b/c"))
167
input = [{'a': {'b': 'b_1'}}]
168
self.assertEquals([{'b': 'b_1'}], f(input, "a"))
169
self.assertEquals(['b_1'], f(input, "a/b"))
170
self.assertEquals([], f(input, "a/b/c"))
172
input = [{'a': {'b': {'c': 'c_1'}}}]
173
self.assertEquals([{'b': {'c': 'c_1'}}], f(input, "a"))
174
self.assertEquals([{'c': 'c_1'}], f(input, "a/b"))
175
self.assertEquals(['c_1'], f(input, "a/b/c"))
177
input = [{'a': {'b': {'c': 'c_1'}}}, {'a': None}]
178
self.assertEquals([{'b': {'c': 'c_1'}}], f(input, "a"))
179
self.assertEquals([{'c': 'c_1'}], f(input, "a/b"))
180
self.assertEquals(['c_1'], f(input, "a/b/c"))
182
input = [{'a': {'b': {'c': 'c_1'}}},
184
self.assertEquals([{'b': {'c': 'c_1'}}, {'b': None}], f(input, "a"))
185
self.assertEquals([{'c': 'c_1'}], f(input, "a/b"))
186
self.assertEquals(['c_1'], f(input, "a/b/c"))
188
input = [{'a': {'b': {'c': 'c_1'}}},
189
{'a': {'b': {'c': 'c_2'}}}]
190
self.assertEquals([{'b': {'c': 'c_1'}}, {'b': {'c': 'c_2'}}],
192
self.assertEquals([{'c': 'c_1'}, {'c': 'c_2'}], f(input, "a/b"))
193
self.assertEquals(['c_1', 'c_2'], f(input, "a/b/c"))
195
self.assertEquals([], f(input, "a/b/c/d"))
196
self.assertEquals([], f(input, "c/a/b/d"))
197
self.assertEquals([], f(input, "i/r/t"))
199
def test_flattens_lists(self):
200
f = utils.get_from_path
202
input = [{'a': [1, 2, 3]}]
203
self.assertEquals([1, 2, 3], f(input, "a"))
204
self.assertEquals([], f(input, "a/b"))
205
self.assertEquals([], f(input, "a/b/c"))
207
input = [{'a': {'b': [1, 2, 3]}}]
208
self.assertEquals([{'b': [1, 2, 3]}], f(input, "a"))
209
self.assertEquals([1, 2, 3], f(input, "a/b"))
210
self.assertEquals([], f(input, "a/b/c"))
212
input = [{'a': {'b': [1, 2, 3]}}, {'a': {'b': [4, 5, 6]}}]
213
self.assertEquals([1, 2, 3, 4, 5, 6], f(input, "a/b"))
214
self.assertEquals([], f(input, "a/b/c"))
216
input = [{'a': [{'b': [1, 2, 3]}, {'b': [4, 5, 6]}]}]
217
self.assertEquals([1, 2, 3, 4, 5, 6], f(input, "a/b"))
218
self.assertEquals([], f(input, "a/b/c"))
220
input = [{'a': [1, 2, {'b': 'b_1'}]}]
221
self.assertEquals([1, 2, {'b': 'b_1'}], f(input, "a"))
222
self.assertEquals(['b_1'], f(input, "a/b"))
224
def test_bad_xpath(self):
225
f = utils.get_from_path
227
self.assertRaises(exception.Error, f, [], None)
228
self.assertRaises(exception.Error, f, [], "")
229
self.assertRaises(exception.Error, f, [], "/")
230
self.assertRaises(exception.Error, f, [], "/a")
231
self.assertRaises(exception.Error, f, [], "/a/")
232
self.assertRaises(exception.Error, f, [], "//")
233
self.assertRaises(exception.Error, f, [], "//a")
234
self.assertRaises(exception.Error, f, [], "a//a")
235
self.assertRaises(exception.Error, f, [], "a//a/")
236
self.assertRaises(exception.Error, f, [], "a/a/")
238
def test_real_failure1(self):
239
# Real world failure case...
240
# We weren't coping when the input was a Dictionary instead of a List
241
# This led to test_accepts_dictionaries
242
f = utils.get_from_path
244
inst = {'fixed_ip': {'floating_ips': [{'address': '1.2.3.4'}],
245
'address': '192.168.0.3'},
248
private_ips = f(inst, 'fixed_ip/address')
249
public_ips = f(inst, 'fixed_ip/floating_ips/address')
250
self.assertEquals(['192.168.0.3'], private_ips)
251
self.assertEquals(['1.2.3.4'], public_ips)
253
def test_accepts_dictionaries(self):
254
f = utils.get_from_path
256
input = {'a': [1, 2, 3]}
257
self.assertEquals([1, 2, 3], f(input, "a"))
258
self.assertEquals([], f(input, "a/b"))
259
self.assertEquals([], f(input, "a/b/c"))
261
input = {'a': {'b': [1, 2, 3]}}
262
self.assertEquals([{'b': [1, 2, 3]}], f(input, "a"))
263
self.assertEquals([1, 2, 3], f(input, "a/b"))
264
self.assertEquals([], f(input, "a/b/c"))
266
input = {'a': [{'b': [1, 2, 3]}, {'b': [4, 5, 6]}]}
267
self.assertEquals([1, 2, 3, 4, 5, 6], f(input, "a/b"))
268
self.assertEquals([], f(input, "a/b/c"))
270
input = {'a': [1, 2, {'b': 'b_1'}]}
271
self.assertEquals([1, 2, {'b': 'b_1'}], f(input, "a"))
272
self.assertEquals(['b_1'], f(input, "a/b"))
275
class GenericUtilsTestCase(test.TestCase):
276
def test_hostname_unicode_sanitization(self):
277
hostname = u"\u7684.test.example.com"
278
self.assertEqual("test.example.com",
279
utils.sanitize_hostname(hostname))
281
def test_hostname_sanitize_periods(self):
282
hostname = "....test.example.com..."
283
self.assertEqual("test.example.com",
284
utils.sanitize_hostname(hostname))
286
def test_hostname_sanitize_dashes(self):
287
hostname = "----test.example.com---"
288
self.assertEqual("test.example.com",
289
utils.sanitize_hostname(hostname))
291
def test_hostname_sanitize_characters(self):
292
hostname = "(#@&$!(@*--#&91)(__=+--test-host.example!!.com-0+"
293
self.assertEqual("91----test-host.example.com-0",
294
utils.sanitize_hostname(hostname))
296
def test_hostname_translate(self):
297
hostname = "<}\x1fh\x10e\x08l\x02l\x05o\x12!{>"
298
self.assertEqual("hello", utils.sanitize_hostname(hostname))
300
def test_bool_from_str(self):
301
self.assertTrue(utils.bool_from_str('1'))
302
self.assertTrue(utils.bool_from_str('2'))
303
self.assertTrue(utils.bool_from_str('-1'))
304
self.assertTrue(utils.bool_from_str('true'))
305
self.assertTrue(utils.bool_from_str('True'))
306
self.assertTrue(utils.bool_from_str('tRuE'))
307
self.assertFalse(utils.bool_from_str('False'))
308
self.assertFalse(utils.bool_from_str('false'))
309
self.assertFalse(utils.bool_from_str('0'))
310
self.assertFalse(utils.bool_from_str(None))
311
self.assertFalse(utils.bool_from_str('junk'))
313
def test_generate_glance_url(self):
314
generated_url = utils.generate_glance_url()
315
actual_url = "http://%s:%d" % (FLAGS.glance_host, FLAGS.glance_port)
316
self.assertEqual(generated_url, actual_url)
318
def test_read_cached_file(self):
319
self.mox.StubOutWithMock(os.path, "getmtime")
320
os.path.getmtime(mox.IgnoreArg()).AndReturn(1)
323
cache_data = {"data": 1123, "mtime": 1}
324
data = utils.read_cached_file("/this/is/a/fake", cache_data)
325
self.assertEqual(cache_data["data"], data)
327
def test_read_modified_cached_file(self):
328
self.mox.StubOutWithMock(os.path, "getmtime")
329
self.mox.StubOutWithMock(__builtin__, 'open')
330
os.path.getmtime(mox.IgnoreArg()).AndReturn(2)
332
fake_contents = "lorem ipsum"
333
fake_file = self.mox.CreateMockAnything()
334
fake_file.read().AndReturn(fake_contents)
335
fake_context_manager = self.mox.CreateMockAnything()
336
fake_context_manager.__enter__().AndReturn(fake_file)
337
fake_context_manager.__exit__(mox.IgnoreArg(),
341
__builtin__.open(mox.IgnoreArg()).AndReturn(fake_context_manager)
344
cache_data = {"data": 1123, "mtime": 1}
345
self.reload_called = False
347
def test_reload(reloaded_data):
348
self.assertEqual(reloaded_data, fake_contents)
349
self.reload_called = True
351
data = utils.read_cached_file("/this/is/a/fake", cache_data,
352
reload_func=test_reload)
353
self.assertEqual(data, fake_contents)
354
self.assertTrue(self.reload_called)
356
def test_generate_password(self):
357
password = utils.generate_password()
358
self.assertTrue([c for c in password if c in '0123456789'])
359
self.assertTrue([c for c in password
360
if c in 'abcdefghijklmnopqrstuvwxyz'])
361
self.assertTrue([c for c in password
362
if c in 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'])
364
def test_read_file_as_root(self):
365
def fake_execute(*args, **kwargs):
367
raise exception.ProcessExecutionError
368
return 'fakecontents', None
370
self.stubs.Set(utils, 'execute', fake_execute)
371
contents = utils.read_file_as_root('good')
372
self.assertEqual(contents, 'fakecontents')
373
self.assertRaises(exception.FileNotFound,
374
utils.read_file_as_root, 'bad')
376
def test_strcmp_const_time(self):
377
self.assertTrue(utils.strcmp_const_time('abc123', 'abc123'))
378
self.assertFalse(utils.strcmp_const_time('a', 'aaaaa'))
379
self.assertFalse(utils.strcmp_const_time('ABC123', 'abc123'))
381
def test_temporary_chown(self):
382
def fake_execute(*args, **kwargs):
383
if args[0] == 'chown':
384
fake_execute.uid = args[1]
385
self.stubs.Set(utils, 'execute', fake_execute)
387
with tempfile.NamedTemporaryFile() as f:
388
with utils.temporary_chown(f.name, owner_uid=2):
389
self.assertEqual(fake_execute.uid, 2)
390
self.assertEqual(fake_execute.uid, os.getuid())
392
def test_service_is_up(self):
393
fts_func = datetime.datetime.fromtimestamp
397
self.flags(service_down_time=down_time)
398
self.mox.StubOutWithMock(timeutils, 'utcnow')
401
timeutils.utcnow().AndReturn(fts_func(fake_now))
402
service = {'updated_at': fts_func(fake_now - down_time),
403
'created_at': fts_func(fake_now - down_time)}
405
result = utils.service_is_up(service)
406
self.assertTrue(result)
410
timeutils.utcnow().AndReturn(fts_func(fake_now))
411
service = {'updated_at': fts_func(fake_now - down_time + 1),
412
'created_at': fts_func(fake_now - down_time + 1)}
414
result = utils.service_is_up(service)
415
self.assertTrue(result)
419
timeutils.utcnow().AndReturn(fts_func(fake_now))
420
service = {'updated_at': fts_func(fake_now - down_time - 1),
421
'created_at': fts_func(fake_now - down_time - 1)}
423
result = utils.service_is_up(service)
424
self.assertFalse(result)
426
def test_xhtml_escape(self):
427
self.assertEqual('"foo"', utils.xhtml_escape('"foo"'))
428
self.assertEqual(''foo'', utils.xhtml_escape("'foo'"))
430
def test_hash_file(self):
431
data = 'Mary had a little lamb, its fleece as white as snow'
432
flo = StringIO.StringIO(data)
433
h1 = utils.hash_file(flo)
434
h2 = hashlib.sha1(data).hexdigest()
435
self.assertEquals(h1, h2)
438
class IsUUIDLikeTestCase(test.TestCase):
439
def assertUUIDLike(self, val, expected):
440
result = utils.is_uuid_like(val)
441
self.assertEqual(result, expected)
443
def test_good_uuid(self):
444
val = 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa'
445
self.assertUUIDLike(val, True)
447
def test_integer_passed(self):
449
self.assertUUIDLike(val, False)
451
def test_non_uuid_string_passed(self):
453
self.assertUUIDLike(val, False)
455
def test_non_uuid_string_passed2(self):
456
val = 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx'
457
self.assertUUIDLike(val, False)
459
def test_gen_valid_uuid(self):
460
self.assertUUIDLike(str(utils.gen_uuid()), True)
463
class MonkeyPatchTestCase(test.TestCase):
464
"""Unit test for utils.monkey_patch()."""
466
super(MonkeyPatchTestCase, self).setUp()
467
self.example_package = 'cinder.tests.monkey_patch_example.'
470
monkey_patch_modules=[self.example_package + 'example_a' + ':'
471
+ self.example_package + 'example_decorator'])
473
def test_monkey_patch(self):
475
cinder.tests.monkey_patch_example.CALLED_FUNCTION = []
476
from cinder.tests.monkey_patch_example import example_a
477
from cinder.tests.monkey_patch_example import example_b
479
self.assertEqual('Example function', example_a.example_function_a())
480
exampleA = example_a.ExampleClassA()
481
exampleA.example_method()
482
ret_a = exampleA.example_method_add(3, 5)
483
self.assertEqual(ret_a, 8)
485
self.assertEqual('Example function', example_b.example_function_b())
486
exampleB = example_b.ExampleClassB()
487
exampleB.example_method()
488
ret_b = exampleB.example_method_add(3, 5)
490
self.assertEqual(ret_b, 8)
491
package_a = self.example_package + 'example_a.'
492
self.assertTrue(package_a + 'example_function_a'
493
in cinder.tests.monkey_patch_example.CALLED_FUNCTION)
495
self.assertTrue(package_a + 'ExampleClassA.example_method'
496
in cinder.tests.monkey_patch_example.CALLED_FUNCTION)
497
self.assertTrue(package_a + 'ExampleClassA.example_method_add'
498
in cinder.tests.monkey_patch_example.CALLED_FUNCTION)
499
package_b = self.example_package + 'example_b.'
500
self.assertFalse(package_b + 'example_function_b'
501
in cinder.tests.monkey_patch_example.CALLED_FUNCTION)
502
self.assertFalse(package_b + 'ExampleClassB.example_method'
503
in cinder.tests.monkey_patch_example.CALLED_FUNCTION)
504
self.assertFalse(package_b + 'ExampleClassB.example_method_add'
505
in cinder.tests.monkey_patch_example.CALLED_FUNCTION)
508
class AuditPeriodTest(test.TestCase):
511
super(AuditPeriodTest, self).setUp()
512
#a fairly random time to test with
513
self.test_time = datetime.datetime(second=23,
519
timeutils.set_time_override(override_time=self.test_time)
522
timeutils.clear_time_override()
523
super(AuditPeriodTest, self).tearDown()
526
begin, end = utils.last_completed_audit_period(unit='hour')
527
self.assertEquals(begin, datetime.datetime(
532
self.assertEquals(end, datetime.datetime(
538
def test_hour_with_offset_before_current(self):
539
begin, end = utils.last_completed_audit_period(unit='hour@10')
540
self.assertEquals(begin, datetime.datetime(
546
self.assertEquals(end, datetime.datetime(
553
def test_hour_with_offset_after_current(self):
554
begin, end = utils.last_completed_audit_period(unit='hour@30')
555
self.assertEquals(begin, datetime.datetime(
561
self.assertEquals(end, datetime.datetime(
569
begin, end = utils.last_completed_audit_period(unit='day')
570
self.assertEquals(begin, datetime.datetime(
574
self.assertEquals(end, datetime.datetime(
579
def test_day_with_offset_before_current(self):
580
begin, end = utils.last_completed_audit_period(unit='day@6')
581
self.assertEquals(begin, datetime.datetime(
586
self.assertEquals(end, datetime.datetime(
592
def test_day_with_offset_after_current(self):
593
begin, end = utils.last_completed_audit_period(unit='day@10')
594
self.assertEquals(begin, datetime.datetime(
599
self.assertEquals(end, datetime.datetime(
605
def test_month(self):
606
begin, end = utils.last_completed_audit_period(unit='month')
607
self.assertEquals(begin, datetime.datetime(
611
self.assertEquals(end, datetime.datetime(
616
def test_month_with_offset_before_current(self):
617
begin, end = utils.last_completed_audit_period(unit='month@2')
618
self.assertEquals(begin, datetime.datetime(
622
self.assertEquals(end, datetime.datetime(
627
def test_month_with_offset_after_current(self):
628
begin, end = utils.last_completed_audit_period(unit='month@15')
629
self.assertEquals(begin, datetime.datetime(
633
self.assertEquals(end, datetime.datetime(
639
begin, end = utils.last_completed_audit_period(unit='year')
640
self.assertEquals(begin, datetime.datetime(
644
self.assertEquals(end, datetime.datetime(
649
def test_year_with_offset_before_current(self):
650
begin, end = utils.last_completed_audit_period(unit='year@2')
651
self.assertEquals(begin, datetime.datetime(
655
self.assertEquals(end, datetime.datetime(
660
def test_year_with_offset_after_current(self):
661
begin, end = utils.last_completed_audit_period(unit='year@6')
662
self.assertEquals(begin, datetime.datetime(
666
self.assertEquals(end, datetime.datetime(