1
# -*- test-case-name: twisted.test.test_util -*-
2
# Copyright (c) 2001-2009 Twisted Matrix Laboratories.
3
# See LICENSE for details.
12
from twisted.trial import unittest
14
from twisted.python import util
15
from twisted.internet import reactor
16
from twisted.internet.interfaces import IReactorProcess
17
from twisted.internet.protocol import ProcessProtocol
18
from twisted.internet.defer import Deferred
19
from twisted.internet.error import ProcessDone
21
from twisted.test.test_process import MockOS
25
class UtilTestCase(unittest.TestCase):
28
l = ["a", 1, "ab", "a", 3, 4, 1, 2, 2, 4, 6]
29
self.assertEquals(util.uniquify(l), ["a", 1, "ab", 3, 4, 2, 6])
32
self.failUnless(util.raises(ZeroDivisionError, divmod, 1, 0))
33
self.failIf(util.raises(ZeroDivisionError, divmod, 0, 1))
36
util.raises(TypeError, divmod, 1, 0)
37
except ZeroDivisionError:
40
raise unittest.FailTest, "util.raises didn't raise when it should have"
42
def testUninterruptably(self):
45
exc = self.exceptions.pop()
47
raise exc(errno.EINTR, "Interrupted system call!")
50
self.exceptions = [None]
52
self.assertEquals(util.untilConcludes(f, 1, 2), 3)
53
self.assertEquals(self.calls, 1)
55
self.exceptions = [None, OSError, IOError]
57
self.assertEquals(util.untilConcludes(f, 2, 3), 5)
58
self.assertEquals(self.calls, 3)
60
def testNameToLabel(self):
62
Test the various kinds of inputs L{nameToLabel} supports.
68
('fooBar', 'Foo Bar'),
69
('fooBarBaz', 'Foo Bar Baz'),
71
for inp, out in nameData:
72
got = util.nameToLabel(inp)
75
"nameToLabel(%r) == %r != %r" % (inp, got, out))
78
def test_uidFromNumericString(self):
80
When L{uidFromString} is called with a base-ten string representation
81
of an integer, it returns the integer.
83
self.assertEqual(util.uidFromString("100"), 100)
86
def test_uidFromUsernameString(self):
88
When L{uidFromString} is called with a base-ten string representation
89
of an integer, it returns the integer.
91
pwent = pwd.getpwuid(os.getuid())
92
self.assertEqual(util.uidFromString(pwent.pw_name), pwent.pw_uid)
94
test_uidFromUsernameString.skip = (
95
"Username/UID conversion requires the pwd module.")
98
def test_gidFromNumericString(self):
100
When L{gidFromString} is called with a base-ten string representation
101
of an integer, it returns the integer.
103
self.assertEqual(util.gidFromString("100"), 100)
106
def test_gidFromGroupnameString(self):
108
When L{gidFromString} is called with a base-ten string representation
109
of an integer, it returns the integer.
111
grent = grp.getgrgid(os.getgid())
112
self.assertEqual(util.gidFromString(grent.gr_name), grent.gr_gid)
114
test_gidFromGroupnameString.skip = (
115
"Group Name/GID conversion requires the grp module.")
118
def test_moduleMovedForSplitDeprecation(self):
120
Calling L{moduleMovedForSplit} results in a deprecation warning.
122
util.moduleMovedForSplit("foo", "bar", "baz", "quux", "corge", {})
123
warnings = self.flushWarnings(
124
offendingFunctions=[self.test_moduleMovedForSplitDeprecation])
126
warnings[0]['message'],
127
"moduleMovedForSplit is deprecated since Twisted 9.0.")
128
self.assertEquals(warnings[0]['category'], DeprecationWarning)
129
self.assertEquals(len(warnings), 1)
133
class TestMergeFunctionMetadata(unittest.TestCase):
135
Tests for L{mergeFunctionMetadata}.
138
def test_mergedFunctionBehavesLikeMergeTarget(self):
140
After merging C{foo}'s data into C{bar}, the returned function behaves
143
foo_object = object()
144
bar_object = object()
149
def bar(x, y, (a, b), c=10, *d, **e):
152
baz = util.mergeFunctionMetadata(foo, bar)
153
self.assertIdentical(baz(1, 2, (3, 4), quux=10), bar_object)
156
def test_moduleIsMerged(self):
158
Merging C{foo} into C{bar} returns a function with C{foo}'s
166
bar.__module__ = 'somewhere.else'
168
baz = util.mergeFunctionMetadata(foo, bar)
169
self.assertEqual(baz.__module__, foo.__module__)
172
def test_docstringIsMerged(self):
174
Merging C{foo} into C{bar} returns a function with C{foo}'s docstring.
187
baz = util.mergeFunctionMetadata(foo, bar)
188
self.assertEqual(baz.__doc__, foo.__doc__)
191
def test_nameIsMerged(self):
193
Merging C{foo} into C{bar} returns a function with C{foo}'s name.
202
baz = util.mergeFunctionMetadata(foo, bar)
203
self.assertEqual(baz.__name__, foo.__name__)
206
def test_instanceDictionaryIsMerged(self):
208
Merging C{foo} into C{bar} returns a function with C{bar}'s
209
dictionary, updated by C{foo}'s.
222
baz = util.mergeFunctionMetadata(foo, bar)
223
self.assertEqual(foo.a, baz.a)
224
self.assertEqual(foo.b, baz.b)
225
self.assertEqual(bar.c, baz.c)
229
class OrderedDictTest(unittest.TestCase):
230
def testOrderedDict(self):
231
d = util.OrderedDict()
236
self.assertEquals(repr(d), "{'a': 'b', 'b': 'a', 3: 12, 1234: 4321}")
237
self.assertEquals(d.values(), ['b', 'a', 12, 4321])
239
self.assertEquals(repr(d), "{'a': 'b', 'b': 'a', 1234: 4321}")
240
self.assertEquals(d, {'a': 'b', 'b': 'a', 1234:4321})
241
self.assertEquals(d.keys(), ['a', 'b', 1234])
242
self.assertEquals(list(d.iteritems()),
243
[('a', 'b'), ('b','a'), (1234, 4321)])
245
self.assertEquals(item, (1234, 4321))
247
def testInitialization(self):
248
d = util.OrderedDict({'monkey': 'ook',
250
self.failUnless(d._order)
252
d = util.OrderedDict(((1,1),(3,3),(2,2),(0,0)))
253
self.assertEquals(repr(d), "{1: 1, 3: 3, 2: 2, 0: 0}")
255
class InsensitiveDictTest(unittest.TestCase):
256
def testPreserve(self):
257
InsensitiveDict=util.InsensitiveDict
258
dct=InsensitiveDict({'Foo':'bar', 1:2, 'fnz':{1:2}}, preserve=1)
259
self.assertEquals(dct['fnz'], {1:2})
260
self.assertEquals(dct['foo'], 'bar')
261
self.assertEquals(dct.copy(), dct)
262
self.assertEquals(dct['foo'], dct.get('Foo'))
263
assert 1 in dct and 'foo' in dct
264
self.assertEquals(eval(repr(dct)), dct)
265
keys=['Foo', 'fnz', 1]
267
assert x in dct.keys()
268
assert (x, dct[x]) in dct.items()
269
self.assertEquals(len(keys), len(dct))
273
def testNoPreserve(self):
274
InsensitiveDict=util.InsensitiveDict
275
dct=InsensitiveDict({'Foo':'bar', 1:2, 'fnz':{1:2}}, preserve=0)
276
keys=['foo', 'fnz', 1]
278
assert x in dct.keys()
279
assert (x, dct[x]) in dct.items()
280
self.assertEquals(len(keys), len(dct))
287
class PasswordTestingProcessProtocol(ProcessProtocol):
289
Write the string C{"secret\n"} to a subprocess and then collect all of
290
its output and fire a Deferred with it when the process ends.
292
def connectionMade(self):
294
self.transport.write('secret\n')
296
def childDataReceived(self, fd, output):
297
self.output.append((fd, output))
299
def processEnded(self, reason):
300
self.finished.callback((reason, self.output))
303
class GetPasswordTest(unittest.TestCase):
304
if not IReactorProcess.providedBy(reactor):
305
skip = "Process support required to test getPassword"
307
def test_stdin(self):
309
Making sure getPassword accepts a password from standard input by
310
running a child process which uses getPassword to read in a string
311
which it then writes it out again. Write a string to the child
312
process and then read one and make sure it is the right string.
314
p = PasswordTestingProcessProtocol()
315
p.finished = Deferred()
316
reactor.spawnProcess(
322
'from twisted.python.util import getPassword\n'
323
'sys.stdout.write(getPassword())\n'
324
'sys.stdout.flush()\n')],
325
env={'PYTHONPATH': os.pathsep.join(sys.path)})
327
def processFinished((reason, output)):
328
reason.trap(ProcessDone)
329
self.assertIn((1, 'secret'), output)
331
return p.finished.addCallback(processFinished)
335
class SearchUpwardsTest(unittest.TestCase):
336
def testSearchupwards(self):
337
os.makedirs('searchupwards/a/b/c')
338
file('searchupwards/foo.txt', 'w').close()
339
file('searchupwards/a/foo.txt', 'w').close()
340
file('searchupwards/a/b/c/foo.txt', 'w').close()
341
os.mkdir('searchupwards/bar')
342
os.mkdir('searchupwards/bam')
343
os.mkdir('searchupwards/a/bar')
344
os.mkdir('searchupwards/a/b/bam')
345
actual=util.searchupwards('searchupwards/a/b/c',
348
expected=os.path.abspath('searchupwards') + os.sep
349
self.assertEqual(actual, expected)
350
shutil.rmtree('searchupwards')
351
actual=util.searchupwards('searchupwards/a/b/c',
355
self.assertEqual(actual, expected)
358
def __init__(self, x):
361
class DSU(unittest.TestCase):
363
L = [Foo(x) for x in range(20, 9, -1)]
364
L2 = util.dsu(L, lambda o: o.x)
365
self.assertEquals(range(10, 21), [o.x for o in L2])
367
class IntervalDifferentialTestCase(unittest.TestCase):
368
def testDefault(self):
369
d = iter(util.IntervalDifferential([], 10))
371
self.assertEquals(d.next(), (10, None))
373
def testSingle(self):
374
d = iter(util.IntervalDifferential([5], 10))
376
self.assertEquals(d.next(), (5, 0))
379
d = iter(util.IntervalDifferential([5, 7], 10))
381
self.assertEquals(d.next(), (5, 0))
382
self.assertEquals(d.next(), (2, 1))
383
self.assertEquals(d.next(), (3, 0))
384
self.assertEquals(d.next(), (4, 1))
385
self.assertEquals(d.next(), (1, 0))
386
self.assertEquals(d.next(), (5, 0))
387
self.assertEquals(d.next(), (1, 1))
388
self.assertEquals(d.next(), (4, 0))
389
self.assertEquals(d.next(), (3, 1))
390
self.assertEquals(d.next(), (2, 0))
391
self.assertEquals(d.next(), (5, 0))
392
self.assertEquals(d.next(), (0, 1))
394
def testTriple(self):
395
d = iter(util.IntervalDifferential([2, 4, 5], 10))
397
self.assertEquals(d.next(), (2, 0))
398
self.assertEquals(d.next(), (2, 0))
399
self.assertEquals(d.next(), (0, 1))
400
self.assertEquals(d.next(), (1, 2))
401
self.assertEquals(d.next(), (1, 0))
402
self.assertEquals(d.next(), (2, 0))
403
self.assertEquals(d.next(), (0, 1))
404
self.assertEquals(d.next(), (2, 0))
405
self.assertEquals(d.next(), (0, 2))
406
self.assertEquals(d.next(), (2, 0))
407
self.assertEquals(d.next(), (0, 1))
408
self.assertEquals(d.next(), (2, 0))
409
self.assertEquals(d.next(), (1, 2))
410
self.assertEquals(d.next(), (1, 0))
411
self.assertEquals(d.next(), (0, 1))
412
self.assertEquals(d.next(), (2, 0))
413
self.assertEquals(d.next(), (2, 0))
414
self.assertEquals(d.next(), (0, 1))
415
self.assertEquals(d.next(), (0, 2))
417
def testInsert(self):
418
d = iter(util.IntervalDifferential([], 10))
419
self.assertEquals(d.next(), (10, None))
421
self.assertEquals(d.next(), (3, 0))
422
self.assertEquals(d.next(), (3, 0))
424
self.assertEquals(d.next(), (3, 0))
425
self.assertEquals(d.next(), (3, 0))
426
self.assertEquals(d.next(), (0, 1))
427
self.assertEquals(d.next(), (3, 0))
428
self.assertEquals(d.next(), (3, 0))
429
self.assertEquals(d.next(), (0, 1))
431
def testRemove(self):
432
d = iter(util.IntervalDifferential([3, 5], 10))
433
self.assertEquals(d.next(), (3, 0))
434
self.assertEquals(d.next(), (2, 1))
435
self.assertEquals(d.next(), (1, 0))
437
self.assertEquals(d.next(), (4, 0))
438
self.assertEquals(d.next(), (5, 0))
440
self.assertEquals(d.next(), (10, None))
441
self.assertRaises(ValueError, d.removeInterval, 10)
445
class Record(util.FancyEqMixin):
447
Trivial user of L{FancyEqMixin} used by tests.
449
compareAttributes = ('a', 'b')
451
def __init__(self, a, b):
457
class DifferentRecord(util.FancyEqMixin):
459
Trivial user of L{FancyEqMixin} which is not related to L{Record}.
461
compareAttributes = ('a', 'b')
463
def __init__(self, a, b):
469
class DerivedRecord(Record):
471
A class with an inheritance relationship to L{Record}.
476
class EqualToEverything(object):
478
A class the instances of which consider themselves equal to everything.
480
def __eq__(self, other):
484
def __ne__(self, other):
489
class EqualToNothing(object):
491
A class the instances of which consider themselves equal to nothing.
493
def __eq__(self, other):
497
def __ne__(self, other):
502
class EqualityTests(unittest.TestCase):
504
Tests for L{FancyEqMixin}.
506
def test_identity(self):
508
Instances of a class which mixes in L{FancyEqMixin} but which
509
defines no comparison attributes compare by identity.
511
class Empty(util.FancyEqMixin):
514
self.assertFalse(Empty() == Empty())
515
self.assertTrue(Empty() != Empty())
517
self.assertTrue(empty == empty)
518
self.assertFalse(empty != empty)
521
def test_equality(self):
523
Instances of a class which mixes in L{FancyEqMixin} should compare
524
equal if all of their attributes compare equal. They should not
525
compare equal if any of their attributes do not compare equal.
527
self.assertTrue(Record(1, 2) == Record(1, 2))
528
self.assertFalse(Record(1, 2) == Record(1, 3))
529
self.assertFalse(Record(1, 2) == Record(2, 2))
530
self.assertFalse(Record(1, 2) == Record(3, 4))
533
def test_unequality(self):
535
Unequality between instances of a particular L{record} should be
536
defined as the negation of equality.
538
self.assertFalse(Record(1, 2) != Record(1, 2))
539
self.assertTrue(Record(1, 2) != Record(1, 3))
540
self.assertTrue(Record(1, 2) != Record(2, 2))
541
self.assertTrue(Record(1, 2) != Record(3, 4))
544
def test_differentClassesEquality(self):
546
Instances of different classes which mix in L{FancyEqMixin} should not
549
self.assertFalse(Record(1, 2) == DifferentRecord(1, 2))
552
def test_differentClassesInequality(self):
554
Instances of different classes which mix in L{FancyEqMixin} should
557
self.assertTrue(Record(1, 2) != DifferentRecord(1, 2))
560
def test_inheritedClassesEquality(self):
562
An instance of a class which derives from a class which mixes in
563
L{FancyEqMixin} should compare equal to an instance of the base class
564
if and only if all of their attributes compare equal.
566
self.assertTrue(Record(1, 2) == DerivedRecord(1, 2))
567
self.assertFalse(Record(1, 2) == DerivedRecord(1, 3))
568
self.assertFalse(Record(1, 2) == DerivedRecord(2, 2))
569
self.assertFalse(Record(1, 2) == DerivedRecord(3, 4))
572
def test_inheritedClassesInequality(self):
574
An instance of a class which derives from a class which mixes in
575
L{FancyEqMixin} should compare unequal to an instance of the base
576
class if any of their attributes compare unequal.
578
self.assertFalse(Record(1, 2) != DerivedRecord(1, 2))
579
self.assertTrue(Record(1, 2) != DerivedRecord(1, 3))
580
self.assertTrue(Record(1, 2) != DerivedRecord(2, 2))
581
self.assertTrue(Record(1, 2) != DerivedRecord(3, 4))
584
def test_rightHandArgumentImplementsEquality(self):
586
The right-hand argument to the equality operator is given a chance
587
to determine the result of the operation if it is of a type
588
unrelated to the L{FancyEqMixin}-based instance on the left-hand
591
self.assertTrue(Record(1, 2) == EqualToEverything())
592
self.assertFalse(Record(1, 2) == EqualToNothing())
595
def test_rightHandArgumentImplementsUnequality(self):
597
The right-hand argument to the non-equality operator is given a
598
chance to determine the result of the operation if it is of a type
599
unrelated to the L{FancyEqMixin}-based instance on the left-hand
602
self.assertFalse(Record(1, 2) != EqualToEverything())
603
self.assertTrue(Record(1, 2) != EqualToNothing())
607
class RunAsEffectiveUserTests(unittest.TestCase):
609
Test for the L{util.runAsEffectiveUser} function.
612
if getattr(os, "geteuid", None) is None:
613
skip = "geteuid/seteuid not available"
616
self.mockos = MockOS()
617
self.patch(os, "geteuid", self.mockos.geteuid)
618
self.patch(os, "getegid", self.mockos.getegid)
619
self.patch(os, "seteuid", self.mockos.seteuid)
620
self.patch(os, "setegid", self.mockos.setegid)
623
def _securedFunction(self, startUID, startGID, wantUID, wantGID):
625
Check if wanted UID/GID matched start or saved ones.
627
self.assertTrue(wantUID == startUID or
628
wantUID == self.mockos.seteuidCalls[-1])
629
self.assertTrue(wantGID == startGID or
630
wantGID == self.mockos.setegidCalls[-1])
633
def test_forwardResult(self):
635
L{util.runAsEffectiveUser} forwards the result obtained by calling the
638
result = util.runAsEffectiveUser(0, 0, lambda: 1)
639
self.assertEquals(result, 1)
642
def test_takeParameters(self):
644
L{util.runAsEffectiveUser} pass the given parameters to the given
647
result = util.runAsEffectiveUser(0, 0, lambda x: 2*x, 3)
648
self.assertEquals(result, 6)
651
def test_takesKeyworkArguments(self):
653
L{util.runAsEffectiveUser} pass the keyword parameters to the given
656
result = util.runAsEffectiveUser(0, 0, lambda x, y=1, z=1: x*y*z, 2, z=3)
657
self.assertEquals(result, 6)
660
def _testUIDGIDSwitch(self, startUID, startGID, wantUID, wantGID,
661
expectedUIDSwitches, expectedGIDSwitches):
663
Helper method checking the calls to C{os.seteuid} and C{os.setegid}
664
made by L{util.runAsEffectiveUser}, when switching from startUID to
665
wantUID and from startGID to wantGID.
667
self.mockos.euid = startUID
668
self.mockos.egid = startGID
669
util.runAsEffectiveUser(
671
self._securedFunction, startUID, startGID, wantUID, wantGID)
672
self.assertEquals(self.mockos.seteuidCalls, expectedUIDSwitches)
673
self.assertEquals(self.mockos.setegidCalls, expectedGIDSwitches)
674
self.mockos.seteuidCalls = []
675
self.mockos.setegidCalls = []
680
Check UID/GID switches when current effective UID is root.
682
self._testUIDGIDSwitch(0, 0, 0, 0, [], [])
683
self._testUIDGIDSwitch(0, 0, 1, 0, [1, 0], [])
684
self._testUIDGIDSwitch(0, 0, 0, 1, [], [1, 0])
685
self._testUIDGIDSwitch(0, 0, 1, 1, [1, 0], [1, 0])
690
Check UID/GID switches when current effective UID is non-root.
692
self._testUIDGIDSwitch(1, 0, 0, 0, [0, 1], [])
693
self._testUIDGIDSwitch(1, 0, 1, 0, [], [])
694
self._testUIDGIDSwitch(1, 0, 1, 1, [0, 1, 0, 1], [1, 0])
695
self._testUIDGIDSwitch(1, 0, 2, 1, [0, 2, 0, 1], [1, 0])
700
Check UID/GID switches when current effective GID is non-root.
702
self._testUIDGIDSwitch(0, 1, 0, 0, [], [0, 1])
703
self._testUIDGIDSwitch(0, 1, 0, 1, [], [])
704
self._testUIDGIDSwitch(0, 1, 1, 1, [1, 0], [])
705
self._testUIDGIDSwitch(0, 1, 1, 2, [1, 0], [2, 1])
708
def test_UIDGID(self):
710
Check UID/GID switches when current effective UID/GID is non-root.
712
self._testUIDGIDSwitch(1, 1, 0, 0, [0, 1], [0, 1])
713
self._testUIDGIDSwitch(1, 1, 0, 1, [0, 1], [])
714
self._testUIDGIDSwitch(1, 1, 1, 0, [0, 1, 0, 1], [0, 1])
715
self._testUIDGIDSwitch(1, 1, 1, 1, [], [])
716
self._testUIDGIDSwitch(1, 1, 2, 1, [0, 2, 0, 1], [])
717
self._testUIDGIDSwitch(1, 1, 1, 2, [0, 1, 0, 1], [2, 1])
718
self._testUIDGIDSwitch(1, 1, 2, 2, [0, 2, 0, 1], [2, 1])
722
class UnsignedIDTests(unittest.TestCase):
724
Tests for L{util.unsignedID} and L{util.setIDFunction}.
728
Save the value of L{util._idFunction} and arrange for it to be restored
731
self.addCleanup(setattr, util, '_idFunction', util._idFunction)
734
def test_setIDFunction(self):
736
L{util.setIDFunction} returns the last value passed to it.
739
previous = util.setIDFunction(value)
740
result = util.setIDFunction(previous)
741
self.assertIdentical(value, result)
744
def test_unsignedID(self):
746
L{util.unsignedID} uses the function passed to L{util.setIDFunction} to
747
determine the unique integer id of an object and then adjusts it to be
748
positive if necessary.
753
# A fake object identity mapping
754
objects = {foo: 17, bar: -73}
758
util.setIDFunction(fakeId)
760
self.assertEquals(util.unsignedID(foo), 17)
761
self.assertEquals(util.unsignedID(bar), (sys.maxint + 1) * 2 - 73)
764
def test_defaultIDFunction(self):
766
L{util.unsignedID} uses the built in L{id} by default.
771
idValue += (sys.maxint + 1) * 2
773
self.assertEquals(util.unsignedID(obj), idValue)
777
class InitGroupsTests(unittest.TestCase):
779
Tests for L{util.initgroups}.
783
skip = "pwd not available"
787
self.addCleanup(setattr, util, "_c_initgroups", util._c_initgroups)
788
self.addCleanup(setattr, util, "setgroups", util.setgroups)
791
def test_initgroupsForceC(self):
793
If we fake the presence of the C extension, it's called instead of the
794
Python implementation.
797
util._c_initgroups = lambda x, y: calls.append((x, y))
799
util.setgroups = calls.append
801
util.initgroups(os.getuid(), 4)
802
self.assertEquals(calls, [(pwd.getpwuid(os.getuid())[0], 4)])
803
self.assertFalse(setgroupsCalls)
806
def test_initgroupsForcePython(self):
808
If we fake the absence of the C extension, the Python implementation is
809
called instead, calling C{os.setgroups}.
811
util._c_initgroups = None
813
util.setgroups = calls.append
814
util.initgroups(os.getuid(), os.getgid())
815
# Something should be in the calls, we don't really care what
816
self.assertTrue(calls)
819
def test_initgroupsInC(self):
821
If the C extension is present, it's called instead of the Python
822
version. We check that by making sure C{os.setgroups} is not called.
825
util.setgroups = calls.append
827
util.initgroups(os.getuid(), os.getgid())
830
self.assertFalse(calls)
833
if util._c_initgroups is None:
834
test_initgroupsInC.skip = "C initgroups not available"