~ubuntu-branches/debian/squeeze/bzr/squeeze

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_selftest.py

  • Committer: Bazaar Package Importer
  • Author(s): John Francesco Ferlito
  • Date: 2009-11-04 22:47:44 UTC
  • mfrom: (1.4.2 upstream) (9.1.3 sid)
  • Revision ID: james.westby@ubuntu.com-20091104224744-axzbc1qj8of7m8lk
Tags: 2.0.2-1
New upstream release.

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
 
19
19
from cStringIO import StringIO
20
20
import os
 
21
import signal
21
22
import sys
22
23
import time
23
24
import unittest
40
41
    workingtree,
41
42
    )
42
43
from bzrlib.repofmt import (
 
44
    groupcompress_repo,
43
45
    pack_repo,
44
46
    weaverepo,
45
47
    )
49
51
    deprecated_method,
50
52
    )
51
53
from bzrlib.tests import (
 
54
    SubUnitFeature,
52
55
    test_lsprof,
53
56
    test_sftp_transport,
54
57
    TestUtil,
124
127
        self.assertEqual(sample_permutation,
125
128
                         get_transport_test_permutations(MockModule()))
126
129
 
127
 
    def test_scenarios_invlude_all_modules(self):
 
130
    def test_scenarios_include_all_modules(self):
128
131
        # this checks that the scenario generator returns as many permutations
129
132
        # as there are in all the registered transport modules - we assume if
130
133
        # this matches its probably doing the right thing especially in
215
218
        from bzrlib.tests.per_repository import formats_to_scenarios
216
219
        formats = [("(c)", remote.RemoteRepositoryFormat()),
217
220
                   ("(d)", repository.format_registry.get(
218
 
                        'Bazaar pack repository format 1 (needs bzr 0.92)\n'))]
 
221
                    'Bazaar repository format 2a (needs bzr 1.16 or later)\n'))]
219
222
        no_vfs_scenarios = formats_to_scenarios(formats, "server", "readonly",
220
223
            None)
221
224
        vfs_scenarios = formats_to_scenarios(formats, "server", "readonly",
222
225
            vfs_transport_factory="vfs")
223
226
        # no_vfs generate scenarios without vfs_transport_factory
224
 
        self.assertEqual([
 
227
        expected = [
225
228
            ('RemoteRepositoryFormat(c)',
226
229
             {'bzrdir_format': remote.RemoteBzrDirFormat(),
227
230
              'repository_format': remote.RemoteRepositoryFormat(),
228
231
              'transport_readonly_server': 'readonly',
229
232
              'transport_server': 'server'}),
230
 
            ('RepositoryFormatKnitPack1(d)',
 
233
            ('RepositoryFormat2a(d)',
231
234
             {'bzrdir_format': bzrdir.BzrDirMetaFormat1(),
232
 
              'repository_format': pack_repo.RepositoryFormatKnitPack1(),
 
235
              'repository_format': groupcompress_repo.RepositoryFormat2a(),
233
236
              'transport_readonly_server': 'readonly',
234
 
              'transport_server': 'server'})],
235
 
            no_vfs_scenarios)
 
237
              'transport_server': 'server'})]
 
238
        self.assertEqual(expected, no_vfs_scenarios)
236
239
        self.assertEqual([
237
240
            ('RemoteRepositoryFormat(c)',
238
241
             {'bzrdir_format': remote.RemoteBzrDirFormat(),
240
243
              'transport_readonly_server': 'readonly',
241
244
              'transport_server': 'server',
242
245
              'vfs_transport_factory': 'vfs'}),
243
 
            ('RepositoryFormatKnitPack1(d)',
 
246
            ('RepositoryFormat2a(d)',
244
247
             {'bzrdir_format': bzrdir.BzrDirMetaFormat1(),
245
 
              'repository_format': pack_repo.RepositoryFormatKnitPack1(),
 
248
              'repository_format': groupcompress_repo.RepositoryFormat2a(),
246
249
              'transport_readonly_server': 'readonly',
247
250
              'transport_server': 'server',
248
251
              'vfs_transport_factory': 'vfs'})],
293
296
        from bzrlib.tests.per_interrepository import make_scenarios
294
297
        server1 = "a"
295
298
        server2 = "b"
296
 
        formats = [(str, "C1", "C2"), (int, "D1", "D2")]
 
299
        formats = [("C0", "C1", "C2"), ("D0", "D1", "D2")]
297
300
        scenarios = make_scenarios(server1, server2, formats)
298
301
        self.assertEqual([
299
 
            ('str,str,str',
300
 
             {'interrepo_class': str,
301
 
              'repository_format': 'C1',
 
302
            ('C0,str,str',
 
303
             {'repository_format': 'C1',
302
304
              'repository_format_to': 'C2',
303
305
              'transport_readonly_server': 'b',
304
306
              'transport_server': 'a'}),
305
 
            ('int,str,str',
306
 
             {'interrepo_class': int,
307
 
              'repository_format': 'D1',
 
307
            ('D0,str,str',
 
308
             {'repository_format': 'D1',
308
309
              'repository_format_to': 'D2',
309
310
              'transport_readonly_server': 'b',
310
311
              'transport_server': 'a'})],
597
598
                l.attempt_lock()
598
599
        test = TestDanglingLock('test_function')
599
600
        result = test.run()
600
 
        self.assertEqual(1, len(result.errors))
 
601
        if self._lock_check_thorough:
 
602
            self.assertEqual(1, len(result.errors))
 
603
        else:
 
604
            # When _lock_check_thorough is disabled, then we don't trigger a
 
605
            # failure
 
606
            self.assertEqual(0, len(result.errors))
601
607
 
602
608
 
603
609
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
681
687
        self.assertEqual(url, t.clone('..').base)
682
688
 
683
689
 
684
 
class MockProgress(progress._BaseProgressBar):
685
 
    """Progress-bar standin that records calls.
686
 
 
687
 
    Useful for testing pb using code.
688
 
    """
689
 
 
690
 
    def __init__(self):
691
 
        progress._BaseProgressBar.__init__(self)
692
 
        self.calls = []
693
 
 
694
 
    def tick(self):
695
 
        self.calls.append(('tick',))
696
 
 
697
 
    def update(self, msg=None, current=None, total=None):
698
 
        self.calls.append(('update', msg, current, total))
699
 
 
700
 
    def clear(self):
701
 
        self.calls.append(('clear',))
702
 
 
703
 
    def note(self, msg, *args):
704
 
        self.calls.append(('note', msg, args))
705
 
 
706
 
 
707
690
class TestTestResult(tests.TestCase):
708
691
 
709
692
    def check_timing(self, test_case, expected_re):
723
706
                self.time(time.sleep, 0.003)
724
707
        self.check_timing(ShortDelayTestCase('test_short_delay'),
725
708
                          r"^ +[0-9]+ms$")
726
 
        # if a benchmark time is given, we want a x of y style result.
 
709
        # if a benchmark time is given, we now show just that time followed by
 
710
        # a star
727
711
        self.check_timing(ShortDelayTestCase('test_short_benchmark'),
728
 
                          r"^ +[0-9]+ms/ +[0-9]+ms$")
 
712
                          r"^ +[0-9]+ms\*$")
729
713
 
730
714
    def test_unittest_reporting_unittest_class(self):
731
715
        # getting the time from a non-bzrlib test works ok
861
845
        self.assertEqual(lines[1], '    foo')
862
846
        self.assertEqual(2, len(lines))
863
847
 
864
 
    def test_text_report_known_failure(self):
865
 
        # text test output formatting
866
 
        pb = MockProgress()
867
 
        result = bzrlib.tests.TextTestResult(
868
 
            StringIO(),
869
 
            descriptions=0,
870
 
            verbosity=1,
871
 
            pb=pb,
872
 
            )
873
 
        test = self.get_passing_test()
874
 
        # this seeds the state to handle reporting the test.
875
 
        result.startTest(test)
876
 
        # the err parameter has the shape:
877
 
        # (class, exception object, traceback)
878
 
        # KnownFailures dont get their tracebacks shown though, so we
879
 
        # can skip that.
880
 
        err = (tests.KnownFailure, tests.KnownFailure('foo'), None)
881
 
        result.report_known_failure(test, err)
882
 
        self.assertEqual(
883
 
            [
884
 
            ('update', '[1 in 0s] passing_test', None, None),
885
 
            ('note', 'XFAIL: %s\n%s\n', ('passing_test', err[1]))
886
 
            ],
887
 
            pb.calls)
888
 
        # known_failures should be printed in the summary, so if we run a test
889
 
        # after there are some known failures, the update prefix should match
890
 
        # this.
891
 
        result.known_failure_count = 3
892
 
        test.run(result)
893
 
        self.assertEqual(
894
 
            [
895
 
            ('update', '[2 in 0s] passing_test', None, None),
896
 
            ],
897
 
            pb.calls[2:])
898
 
 
899
848
    def get_passing_test(self):
900
849
        """Return a test object that can't be run usefully."""
901
850
        def passing_test():
943
892
        result.report_unsupported(test, feature)
944
893
        output = result_stream.getvalue()[prefix:]
945
894
        lines = output.splitlines()
946
 
        self.assertEqual(lines, ['NODEP                   0ms',
 
895
        self.assertEqual(lines, ['NODEP        0ms',
947
896
                                 "    The feature 'Feature' is not available."])
948
897
 
949
 
    def test_text_report_unsupported(self):
950
 
        # text test output formatting
951
 
        pb = MockProgress()
952
 
        result = bzrlib.tests.TextTestResult(
953
 
            StringIO(),
954
 
            descriptions=0,
955
 
            verbosity=1,
956
 
            pb=pb,
957
 
            )
958
 
        test = self.get_passing_test()
959
 
        feature = tests.Feature()
960
 
        # this seeds the state to handle reporting the test.
961
 
        result.startTest(test)
962
 
        result.report_unsupported(test, feature)
963
 
        # no output on unsupported features
964
 
        self.assertEqual(
965
 
            [('update', '[1 in 0s] passing_test', None, None)
966
 
            ],
967
 
            pb.calls)
968
 
        # the number of missing features should be printed in the progress
969
 
        # summary, so check for that.
970
 
        result.unsupported = {'foo':0, 'bar':0}
971
 
        test.run(result)
972
 
        self.assertEqual(
973
 
            [
974
 
            ('update', '[2 in 0s, 2 missing] passing_test', None, None),
975
 
            ],
976
 
            pb.calls[1:])
977
 
 
978
898
    def test_unavailable_exception(self):
979
899
        """An UnavailableFeature being raised should invoke addNotSupported."""
980
900
        class InstrumentedTestResult(tests.ExtendedTestResult):
1081
1001
        runner = tests.TextTestRunner(stream=stream)
1082
1002
        result = self.run_test_runner(runner, test)
1083
1003
        lines = stream.getvalue().splitlines()
1084
 
        self.assertEqual([
1085
 
            '',
1086
 
            '======================================================================',
1087
 
            'FAIL: unittest.FunctionTestCase (failing_test)',
1088
 
            '----------------------------------------------------------------------',
1089
 
            'Traceback (most recent call last):',
1090
 
            '    raise AssertionError(\'foo\')',
1091
 
            'AssertionError: foo',
1092
 
            '',
1093
 
            '----------------------------------------------------------------------',
1094
 
            '',
1095
 
            'FAILED (failures=1, known_failure_count=1)'],
1096
 
            lines[3:8] + lines[9:13] + lines[14:])
 
1004
        self.assertContainsRe(stream.getvalue(),
 
1005
            '(?sm)^testing.*$'
 
1006
            '.*'
 
1007
            '^======================================================================\n'
 
1008
            '^FAIL: unittest.FunctionTestCase \\(failing_test\\)\n'
 
1009
            '^----------------------------------------------------------------------\n'
 
1010
            'Traceback \\(most recent call last\\):\n'
 
1011
            '  .*' # File .*, line .*, in failing_test' - but maybe not from .pyc
 
1012
            '    raise AssertionError\\(\'foo\'\\)\n'
 
1013
            '.*'
 
1014
            '^----------------------------------------------------------------------\n'
 
1015
            '.*'
 
1016
            'FAILED \\(failures=1, known_failure_count=1\\)'
 
1017
            )
1097
1018
 
1098
1019
    def test_known_failure_ok_run(self):
1099
1020
        # run a test that generates a known failure which should be printed in the final output.
1349
1270
class _TestException(Exception):
1350
1271
    pass
1351
1272
 
 
1273
 
1352
1274
class TestTestCase(tests.TestCase):
1353
1275
    """Tests that test the core bzrlib TestCase."""
1354
1276
 
1403
1325
        # we could set something and run a test that will check
1404
1326
        # it gets santised, but this is probably sufficient for now:
1405
1327
        # if someone runs the test with -Dsomething it will error.
1406
 
        self.assertEqual(set(), bzrlib.debug.debug_flags)
 
1328
        flags = set()
 
1329
        if self._lock_check_thorough:
 
1330
            flags.add('strict_locks')
 
1331
        self.assertEqual(flags, bzrlib.debug.debug_flags)
1407
1332
 
1408
1333
    def change_selftest_debug_flags(self, new_flags):
1409
1334
        orig_selftest_flags = tests.selftest_debug_flags
1424
1349
                self.flags = set(bzrlib.debug.debug_flags)
1425
1350
        test = TestThatRecordsFlags('test_foo')
1426
1351
        test.run(self.make_test_result())
1427
 
        self.assertEqual(set(['a-flag']), self.flags)
 
1352
        flags = set(['a-flag'])
 
1353
        if 'disable_lock_checks' not in tests.selftest_debug_flags:
 
1354
            flags.add('strict_locks')
 
1355
        self.assertEqual(flags, self.flags)
 
1356
 
 
1357
    def test_disable_lock_checks(self):
 
1358
        """The -Edisable_lock_checks flag disables thorough checks."""
 
1359
        class TestThatRecordsFlags(tests.TestCase):
 
1360
            def test_foo(nested_self):
 
1361
                self.flags = set(bzrlib.debug.debug_flags)
 
1362
                self.test_lock_check_thorough = nested_self._lock_check_thorough
 
1363
        self.change_selftest_debug_flags(set())
 
1364
        test = TestThatRecordsFlags('test_foo')
 
1365
        test.run(self.make_test_result())
 
1366
        # By default we do strict lock checking and thorough lock/unlock
 
1367
        # tracking.
 
1368
        self.assertTrue(self.test_lock_check_thorough)
 
1369
        self.assertEqual(set(['strict_locks']), self.flags)
 
1370
        # Now set the disable_lock_checks flag, and show that this changed.
 
1371
        self.change_selftest_debug_flags(set(['disable_lock_checks']))
 
1372
        test = TestThatRecordsFlags('test_foo')
 
1373
        test.run(self.make_test_result())
 
1374
        self.assertFalse(self.test_lock_check_thorough)
 
1375
        self.assertEqual(set(), self.flags)
 
1376
 
 
1377
    def test_this_fails_strict_lock_check(self):
 
1378
        class TestThatRecordsFlags(tests.TestCase):
 
1379
            def test_foo(nested_self):
 
1380
                self.flags1 = set(bzrlib.debug.debug_flags)
 
1381
                self.thisFailsStrictLockCheck()
 
1382
                self.flags2 = set(bzrlib.debug.debug_flags)
 
1383
        # Make sure lock checking is active
 
1384
        self.change_selftest_debug_flags(set())
 
1385
        test = TestThatRecordsFlags('test_foo')
 
1386
        test.run(self.make_test_result())
 
1387
        self.assertEqual(set(['strict_locks']), self.flags1)
 
1388
        self.assertEqual(set(), self.flags2)
1428
1389
 
1429
1390
    def test_debug_flags_restored(self):
1430
1391
        """The bzrlib debug flags should be restored to their original state
1485
1446
        result = bzrlib.tests.VerboseTestResult(
1486
1447
            unittest._WritelnDecorator(output_stream),
1487
1448
            descriptions=0,
1488
 
            verbosity=2,
1489
 
            num_tests=sample_test.countTestCases())
 
1449
            verbosity=2)
1490
1450
        sample_test.run(result)
1491
1451
        self.assertContainsRe(
1492
1452
            output_stream.getvalue(),
1493
 
            r"\d+ms/ +\d+ms\n$")
 
1453
            r"\d+ms\*\n$")
1494
1454
 
1495
1455
    def test_hooks_sanitised(self):
1496
1456
        """The bzrlib hooks should be sanitised by setUp."""
1686
1646
    def test_assert_isinstance(self):
1687
1647
        self.assertIsInstance(2, int)
1688
1648
        self.assertIsInstance(u'', basestring)
1689
 
        self.assertRaises(AssertionError, self.assertIsInstance, None, int)
 
1649
        e = self.assertRaises(AssertionError, self.assertIsInstance, None, int)
 
1650
        self.assertEquals(str(e),
 
1651
            "None is an instance of <type 'NoneType'> rather than <type 'int'>")
1690
1652
        self.assertRaises(AssertionError, self.assertIsInstance, 23.3, int)
 
1653
        e = self.assertRaises(AssertionError,
 
1654
            self.assertIsInstance, None, int, "it's just not")
 
1655
        self.assertEquals(str(e),
 
1656
            "None is an instance of <type 'NoneType'> rather than <type 'int'>"
 
1657
            ": it's just not")
1691
1658
 
1692
1659
    def test_assertEndsWith(self):
1693
1660
        self.assertEndsWith('foo', 'oo')
1780
1747
 
1781
1748
    def test_make_tree_for_sftp_branch(self):
1782
1749
        """Transports backed by local directories create local trees."""
1783
 
 
 
1750
        # NB: This is arguably a bug in the definition of make_branch_and_tree.
1784
1751
        tree = self.make_branch_and_tree('t1')
1785
1752
        base = tree.bzrdir.root_transport.base
1786
1753
        self.failIf(base.startswith('sftp'),
1791
1758
                tree.branch.repository.bzrdir.root_transport)
1792
1759
 
1793
1760
 
1794
 
class TestSelftest(tests.TestCase):
 
1761
class SelfTestHelper:
 
1762
 
 
1763
    def run_selftest(self, **kwargs):
 
1764
        """Run selftest returning its output."""
 
1765
        output = StringIO()
 
1766
        old_transport = bzrlib.tests.default_transport
 
1767
        old_root = tests.TestCaseWithMemoryTransport.TEST_ROOT
 
1768
        tests.TestCaseWithMemoryTransport.TEST_ROOT = None
 
1769
        try:
 
1770
            self.assertEqual(True, tests.selftest(stream=output, **kwargs))
 
1771
        finally:
 
1772
            bzrlib.tests.default_transport = old_transport
 
1773
            tests.TestCaseWithMemoryTransport.TEST_ROOT = old_root
 
1774
        output.seek(0)
 
1775
        return output
 
1776
 
 
1777
 
 
1778
class TestSelftest(tests.TestCase, SelfTestHelper):
1795
1779
    """Tests of bzrlib.tests.selftest."""
1796
1780
 
1797
1781
    def test_selftest_benchmark_parameter_invokes_test_suite__benchmark__(self):
1805
1789
            test_suite_factory=factory)
1806
1790
        self.assertEqual([True], factory_called)
1807
1791
 
 
1792
    def factory(self):
 
1793
        """A test suite factory."""
 
1794
        class Test(tests.TestCase):
 
1795
            def a(self):
 
1796
                pass
 
1797
            def b(self):
 
1798
                pass
 
1799
            def c(self):
 
1800
                pass
 
1801
        return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
 
1802
 
 
1803
    def test_list_only(self):
 
1804
        output = self.run_selftest(test_suite_factory=self.factory,
 
1805
            list_only=True)
 
1806
        self.assertEqual(3, len(output.readlines()))
 
1807
 
 
1808
    def test_list_only_filtered(self):
 
1809
        output = self.run_selftest(test_suite_factory=self.factory,
 
1810
            list_only=True, pattern="Test.b")
 
1811
        self.assertEndsWith(output.getvalue(), "Test.b\n")
 
1812
        self.assertLength(1, output.readlines())
 
1813
 
 
1814
    def test_list_only_excludes(self):
 
1815
        output = self.run_selftest(test_suite_factory=self.factory,
 
1816
            list_only=True, exclude_pattern="Test.b")
 
1817
        self.assertNotContainsRe("Test.b", output.getvalue())
 
1818
        self.assertLength(2, output.readlines())
 
1819
 
 
1820
    def test_random(self):
 
1821
        # test randomising by listing a number of tests.
 
1822
        output_123 = self.run_selftest(test_suite_factory=self.factory,
 
1823
            list_only=True, random_seed="123")
 
1824
        output_234 = self.run_selftest(test_suite_factory=self.factory,
 
1825
            list_only=True, random_seed="234")
 
1826
        self.assertNotEqual(output_123, output_234)
 
1827
        # "Randominzing test order..\n\n
 
1828
        self.assertLength(5, output_123.readlines())
 
1829
        self.assertLength(5, output_234.readlines())
 
1830
 
 
1831
    def test_random_reuse_is_same_order(self):
 
1832
        # test randomising by listing a number of tests.
 
1833
        expected = self.run_selftest(test_suite_factory=self.factory,
 
1834
            list_only=True, random_seed="123")
 
1835
        repeated = self.run_selftest(test_suite_factory=self.factory,
 
1836
            list_only=True, random_seed="123")
 
1837
        self.assertEqual(expected.getvalue(), repeated.getvalue())
 
1838
 
 
1839
    def test_runner_class(self):
 
1840
        self.requireFeature(SubUnitFeature)
 
1841
        from subunit import ProtocolTestCase
 
1842
        stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
 
1843
            test_suite_factory=self.factory)
 
1844
        test = ProtocolTestCase(stream)
 
1845
        result = unittest.TestResult()
 
1846
        test.run(result)
 
1847
        self.assertEqual(3, result.testsRun)
 
1848
 
 
1849
    def test_starting_with_single_argument(self):
 
1850
        output = self.run_selftest(test_suite_factory=self.factory,
 
1851
            starting_with=['bzrlib.tests.test_selftest.Test.a'],
 
1852
            list_only=True)
 
1853
        self.assertEqual('bzrlib.tests.test_selftest.Test.a\n',
 
1854
            output.getvalue())
 
1855
 
 
1856
    def test_starting_with_multiple_argument(self):
 
1857
        output = self.run_selftest(test_suite_factory=self.factory,
 
1858
            starting_with=['bzrlib.tests.test_selftest.Test.a',
 
1859
                'bzrlib.tests.test_selftest.Test.b'],
 
1860
            list_only=True)
 
1861
        self.assertEqual('bzrlib.tests.test_selftest.Test.a\n'
 
1862
            'bzrlib.tests.test_selftest.Test.b\n',
 
1863
            output.getvalue())
 
1864
 
 
1865
    def check_transport_set(self, transport_server):
 
1866
        captured_transport = []
 
1867
        def seen_transport(a_transport):
 
1868
            captured_transport.append(a_transport)
 
1869
        class Capture(tests.TestCase):
 
1870
            def a(self):
 
1871
                seen_transport(bzrlib.tests.default_transport)
 
1872
        def factory():
 
1873
            return TestUtil.TestSuite([Capture("a")])
 
1874
        self.run_selftest(transport=transport_server, test_suite_factory=factory)
 
1875
        self.assertEqual(transport_server, captured_transport[0])
 
1876
 
 
1877
    def test_transport_sftp(self):
 
1878
        try:
 
1879
            import bzrlib.transport.sftp
 
1880
        except errors.ParamikoNotPresent:
 
1881
            raise tests.TestSkipped("Paramiko not present")
 
1882
        self.check_transport_set(bzrlib.transport.sftp.SFTPAbsoluteServer)
 
1883
 
 
1884
    def test_transport_memory(self):
 
1885
        self.check_transport_set(bzrlib.transport.memory.MemoryServer)
 
1886
 
 
1887
 
 
1888
class TestSelftestWithIdList(tests.TestCaseInTempDir, SelfTestHelper):
 
1889
    # Does IO: reads test.list
 
1890
 
 
1891
    def test_load_list(self):
 
1892
        # Provide a list with one test - this test.
 
1893
        test_id_line = '%s\n' % self.id()
 
1894
        self.build_tree_contents([('test.list', test_id_line)])
 
1895
        # And generate a list of the tests in  the suite.
 
1896
        stream = self.run_selftest(load_list='test.list', list_only=True)
 
1897
        self.assertEqual(test_id_line, stream.getvalue())
 
1898
 
 
1899
    def test_load_unknown(self):
 
1900
        # Provide a list with one test - this test.
 
1901
        # And generate a list of the tests in  the suite.
 
1902
        err = self.assertRaises(errors.NoSuchFile, self.run_selftest,
 
1903
            load_list='missing file name', list_only=True)
 
1904
 
 
1905
 
 
1906
class TestRunBzr(tests.TestCase):
 
1907
 
 
1908
    out = ''
 
1909
    err = ''
 
1910
 
 
1911
    def _run_bzr_core(self, argv, retcode=0, encoding=None, stdin=None,
 
1912
                         working_dir=None):
 
1913
        """Override _run_bzr_core to test how it is invoked by run_bzr.
 
1914
 
 
1915
        Attempts to run bzr from inside this class don't actually run it.
 
1916
 
 
1917
        We test how run_bzr actually invokes bzr in another location.
 
1918
        Here we only need to test that it is run_bzr passes the right
 
1919
        parameters to run_bzr.
 
1920
        """
 
1921
        self.argv = list(argv)
 
1922
        self.retcode = retcode
 
1923
        self.encoding = encoding
 
1924
        self.stdin = stdin
 
1925
        self.working_dir = working_dir
 
1926
        return self.out, self.err
 
1927
 
 
1928
    def test_run_bzr_error(self):
 
1929
        self.out = "It sure does!\n"
 
1930
        out, err = self.run_bzr_error(['^$'], ['rocks'], retcode=34)
 
1931
        self.assertEqual(['rocks'], self.argv)
 
1932
        self.assertEqual(34, self.retcode)
 
1933
        self.assertEqual(out, 'It sure does!\n')
 
1934
 
 
1935
    def test_run_bzr_error_regexes(self):
 
1936
        self.out = ''
 
1937
        self.err = "bzr: ERROR: foobarbaz is not versioned"
 
1938
        out, err = self.run_bzr_error(
 
1939
                ["bzr: ERROR: foobarbaz is not versioned"],
 
1940
                ['file-id', 'foobarbaz'])
 
1941
 
 
1942
    def test_encoding(self):
 
1943
        """Test that run_bzr passes encoding to _run_bzr_core"""
 
1944
        self.run_bzr('foo bar')
 
1945
        self.assertEqual(None, self.encoding)
 
1946
        self.assertEqual(['foo', 'bar'], self.argv)
 
1947
 
 
1948
        self.run_bzr('foo bar', encoding='baz')
 
1949
        self.assertEqual('baz', self.encoding)
 
1950
        self.assertEqual(['foo', 'bar'], self.argv)
 
1951
 
 
1952
    def test_retcode(self):
 
1953
        """Test that run_bzr passes retcode to _run_bzr_core"""
 
1954
        # Default is retcode == 0
 
1955
        self.run_bzr('foo bar')
 
1956
        self.assertEqual(0, self.retcode)
 
1957
        self.assertEqual(['foo', 'bar'], self.argv)
 
1958
 
 
1959
        self.run_bzr('foo bar', retcode=1)
 
1960
        self.assertEqual(1, self.retcode)
 
1961
        self.assertEqual(['foo', 'bar'], self.argv)
 
1962
 
 
1963
        self.run_bzr('foo bar', retcode=None)
 
1964
        self.assertEqual(None, self.retcode)
 
1965
        self.assertEqual(['foo', 'bar'], self.argv)
 
1966
 
 
1967
        self.run_bzr(['foo', 'bar'], retcode=3)
 
1968
        self.assertEqual(3, self.retcode)
 
1969
        self.assertEqual(['foo', 'bar'], self.argv)
 
1970
 
 
1971
    def test_stdin(self):
 
1972
        # test that the stdin keyword to run_bzr is passed through to
 
1973
        # _run_bzr_core as-is. We do this by overriding
 
1974
        # _run_bzr_core in this class, and then calling run_bzr,
 
1975
        # which is a convenience function for _run_bzr_core, so
 
1976
        # should invoke it.
 
1977
        self.run_bzr('foo bar', stdin='gam')
 
1978
        self.assertEqual('gam', self.stdin)
 
1979
        self.assertEqual(['foo', 'bar'], self.argv)
 
1980
 
 
1981
        self.run_bzr('foo bar', stdin='zippy')
 
1982
        self.assertEqual('zippy', self.stdin)
 
1983
        self.assertEqual(['foo', 'bar'], self.argv)
 
1984
 
 
1985
    def test_working_dir(self):
 
1986
        """Test that run_bzr passes working_dir to _run_bzr_core"""
 
1987
        self.run_bzr('foo bar')
 
1988
        self.assertEqual(None, self.working_dir)
 
1989
        self.assertEqual(['foo', 'bar'], self.argv)
 
1990
 
 
1991
        self.run_bzr('foo bar', working_dir='baz')
 
1992
        self.assertEqual('baz', self.working_dir)
 
1993
        self.assertEqual(['foo', 'bar'], self.argv)
 
1994
 
 
1995
    def test_reject_extra_keyword_arguments(self):
 
1996
        self.assertRaises(TypeError, self.run_bzr, "foo bar",
 
1997
                          error_regex=['error message'])
 
1998
 
 
1999
 
 
2000
class TestRunBzrCaptured(tests.TestCaseWithTransport):
 
2001
    # Does IO when testing the working_dir parameter.
 
2002
 
 
2003
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
 
2004
                         a_callable=None, *args, **kwargs):
 
2005
        self.stdin = stdin
 
2006
        self.factory_stdin = getattr(bzrlib.ui.ui_factory, "stdin", None)
 
2007
        self.factory = bzrlib.ui.ui_factory
 
2008
        self.working_dir = osutils.getcwd()
 
2009
        stdout.write('foo\n')
 
2010
        stderr.write('bar\n')
 
2011
        return 0
 
2012
 
 
2013
    def test_stdin(self):
 
2014
        # test that the stdin keyword to _run_bzr_core is passed through to
 
2015
        # apply_redirected as a StringIO. We do this by overriding
 
2016
        # apply_redirected in this class, and then calling _run_bzr_core,
 
2017
        # which calls apply_redirected.
 
2018
        self.run_bzr(['foo', 'bar'], stdin='gam')
 
2019
        self.assertEqual('gam', self.stdin.read())
 
2020
        self.assertTrue(self.stdin is self.factory_stdin)
 
2021
        self.run_bzr(['foo', 'bar'], stdin='zippy')
 
2022
        self.assertEqual('zippy', self.stdin.read())
 
2023
        self.assertTrue(self.stdin is self.factory_stdin)
 
2024
 
 
2025
    def test_ui_factory(self):
 
2026
        # each invocation of self.run_bzr should get its
 
2027
        # own UI factory, which is an instance of TestUIFactory,
 
2028
        # with stdin, stdout and stderr attached to the stdin,
 
2029
        # stdout and stderr of the invoked run_bzr
 
2030
        current_factory = bzrlib.ui.ui_factory
 
2031
        self.run_bzr(['foo'])
 
2032
        self.failIf(current_factory is self.factory)
 
2033
        self.assertNotEqual(sys.stdout, self.factory.stdout)
 
2034
        self.assertNotEqual(sys.stderr, self.factory.stderr)
 
2035
        self.assertEqual('foo\n', self.factory.stdout.getvalue())
 
2036
        self.assertEqual('bar\n', self.factory.stderr.getvalue())
 
2037
        self.assertIsInstance(self.factory, tests.TestUIFactory)
 
2038
 
 
2039
    def test_working_dir(self):
 
2040
        self.build_tree(['one/', 'two/'])
 
2041
        cwd = osutils.getcwd()
 
2042
 
 
2043
        # Default is to work in the current directory
 
2044
        self.run_bzr(['foo', 'bar'])
 
2045
        self.assertEqual(cwd, self.working_dir)
 
2046
 
 
2047
        self.run_bzr(['foo', 'bar'], working_dir=None)
 
2048
        self.assertEqual(cwd, self.working_dir)
 
2049
 
 
2050
        # The function should be run in the alternative directory
 
2051
        # but afterwards the current working dir shouldn't be changed
 
2052
        self.run_bzr(['foo', 'bar'], working_dir='one')
 
2053
        self.assertNotEqual(cwd, self.working_dir)
 
2054
        self.assertEndsWith(self.working_dir, 'one')
 
2055
        self.assertEqual(cwd, osutils.getcwd())
 
2056
 
 
2057
        self.run_bzr(['foo', 'bar'], working_dir='two')
 
2058
        self.assertNotEqual(cwd, self.working_dir)
 
2059
        self.assertEndsWith(self.working_dir, 'two')
 
2060
        self.assertEqual(cwd, osutils.getcwd())
 
2061
 
 
2062
 
 
2063
class StubProcess(object):
 
2064
    """A stub process for testing run_bzr_subprocess."""
 
2065
    
 
2066
    def __init__(self, out="", err="", retcode=0):
 
2067
        self.out = out
 
2068
        self.err = err
 
2069
        self.returncode = retcode
 
2070
 
 
2071
    def communicate(self):
 
2072
        return self.out, self.err
 
2073
 
 
2074
 
 
2075
class TestRunBzrSubprocess(tests.TestCaseWithTransport):
 
2076
 
 
2077
    def setUp(self):
 
2078
        tests.TestCaseWithTransport.setUp(self)
 
2079
        self.subprocess_calls = []
 
2080
 
 
2081
    def start_bzr_subprocess(self, process_args, env_changes=None,
 
2082
                             skip_if_plan_to_signal=False,
 
2083
                             working_dir=None,
 
2084
                             allow_plugins=False):
 
2085
        """capture what run_bzr_subprocess tries to do."""
 
2086
        self.subprocess_calls.append({'process_args':process_args,
 
2087
            'env_changes':env_changes,
 
2088
            'skip_if_plan_to_signal':skip_if_plan_to_signal,
 
2089
            'working_dir':working_dir, 'allow_plugins':allow_plugins})
 
2090
        return self.next_subprocess
 
2091
 
 
2092
    def assertRunBzrSubprocess(self, expected_args, process, *args, **kwargs):
 
2093
        """Run run_bzr_subprocess with args and kwargs using a stubbed process.
 
2094
 
 
2095
        Inside TestRunBzrSubprocessCommands we use a stub start_bzr_subprocess
 
2096
        that will return static results. This assertion method populates those
 
2097
        results and also checks the arguments run_bzr_subprocess generates.
 
2098
        """
 
2099
        self.next_subprocess = process
 
2100
        try:
 
2101
            result = self.run_bzr_subprocess(*args, **kwargs)
 
2102
        except:
 
2103
            self.next_subprocess = None
 
2104
            for key, expected in expected_args.iteritems():
 
2105
                self.assertEqual(expected, self.subprocess_calls[-1][key])
 
2106
            raise
 
2107
        else:
 
2108
            self.next_subprocess = None
 
2109
            for key, expected in expected_args.iteritems():
 
2110
                self.assertEqual(expected, self.subprocess_calls[-1][key])
 
2111
            return result
 
2112
 
 
2113
    def test_run_bzr_subprocess(self):
 
2114
        """The run_bzr_helper_external command behaves nicely."""
 
2115
        self.assertRunBzrSubprocess({'process_args':['--version']},
 
2116
            StubProcess(), '--version')
 
2117
        self.assertRunBzrSubprocess({'process_args':['--version']},
 
2118
            StubProcess(), ['--version'])
 
2119
        # retcode=None disables retcode checking
 
2120
        result = self.assertRunBzrSubprocess({},
 
2121
            StubProcess(retcode=3), '--version', retcode=None)
 
2122
        result = self.assertRunBzrSubprocess({},
 
2123
            StubProcess(out="is free software"), '--version')
 
2124
        self.assertContainsRe(result[0], 'is free software')
 
2125
        # Running a subcommand that is missing errors
 
2126
        self.assertRaises(AssertionError, self.assertRunBzrSubprocess,
 
2127
            {'process_args':['--versionn']}, StubProcess(retcode=3),
 
2128
            '--versionn')
 
2129
        # Unless it is told to expect the error from the subprocess
 
2130
        result = self.assertRunBzrSubprocess({},
 
2131
            StubProcess(retcode=3), '--versionn', retcode=3)
 
2132
        # Or to ignore retcode checking
 
2133
        result = self.assertRunBzrSubprocess({},
 
2134
            StubProcess(err="unknown command", retcode=3), '--versionn',
 
2135
            retcode=None)
 
2136
        self.assertContainsRe(result[1], 'unknown command')
 
2137
 
 
2138
    def test_env_change_passes_through(self):
 
2139
        self.assertRunBzrSubprocess(
 
2140
            {'env_changes':{'new':'value', 'changed':'newvalue', 'deleted':None}},
 
2141
            StubProcess(), '',
 
2142
            env_changes={'new':'value', 'changed':'newvalue', 'deleted':None})
 
2143
 
 
2144
    def test_no_working_dir_passed_as_None(self):
 
2145
        self.assertRunBzrSubprocess({'working_dir': None}, StubProcess(), '')
 
2146
 
 
2147
    def test_no_working_dir_passed_through(self):
 
2148
        self.assertRunBzrSubprocess({'working_dir': 'dir'}, StubProcess(), '',
 
2149
            working_dir='dir')
 
2150
 
 
2151
    def test_run_bzr_subprocess_no_plugins(self):
 
2152
        self.assertRunBzrSubprocess({'allow_plugins': False},
 
2153
            StubProcess(), '')
 
2154
 
 
2155
    def test_allow_plugins(self):
 
2156
        self.assertRunBzrSubprocess({'allow_plugins': True},
 
2157
            StubProcess(), '', allow_plugins=True)
 
2158
 
 
2159
 
 
2160
class _DontSpawnProcess(Exception):
 
2161
    """A simple exception which just allows us to skip unnecessary steps"""
 
2162
 
 
2163
 
 
2164
class TestStartBzrSubProcess(tests.TestCase):
 
2165
 
 
2166
    def check_popen_state(self):
 
2167
        """Replace to make assertions when popen is called."""
 
2168
 
 
2169
    def _popen(self, *args, **kwargs):
 
2170
        """Record the command that is run, so that we can ensure it is correct"""
 
2171
        self.check_popen_state()
 
2172
        self._popen_args = args
 
2173
        self._popen_kwargs = kwargs
 
2174
        raise _DontSpawnProcess()
 
2175
 
 
2176
    def test_run_bzr_subprocess_no_plugins(self):
 
2177
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [])
 
2178
        command = self._popen_args[0]
 
2179
        self.assertEqual(sys.executable, command[0])
 
2180
        self.assertEqual(self.get_bzr_path(), command[1])
 
2181
        self.assertEqual(['--no-plugins'], command[2:])
 
2182
 
 
2183
    def test_allow_plugins(self):
 
2184
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2185
            allow_plugins=True)
 
2186
        command = self._popen_args[0]
 
2187
        self.assertEqual([], command[2:])
 
2188
 
 
2189
    def test_set_env(self):
 
2190
        self.failIf('EXISTANT_ENV_VAR' in os.environ)
 
2191
        # set in the child
 
2192
        def check_environment():
 
2193
            self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
 
2194
        self.check_popen_state = check_environment
 
2195
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2196
            env_changes={'EXISTANT_ENV_VAR':'set variable'})
 
2197
        # not set in theparent
 
2198
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
 
2199
 
 
2200
    def test_run_bzr_subprocess_env_del(self):
 
2201
        """run_bzr_subprocess can remove environment variables too."""
 
2202
        self.failIf('EXISTANT_ENV_VAR' in os.environ)
 
2203
        def check_environment():
 
2204
            self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
 
2205
        os.environ['EXISTANT_ENV_VAR'] = 'set variable'
 
2206
        self.check_popen_state = check_environment
 
2207
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2208
            env_changes={'EXISTANT_ENV_VAR':None})
 
2209
        # Still set in parent
 
2210
        self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
 
2211
        del os.environ['EXISTANT_ENV_VAR']
 
2212
 
 
2213
    def test_env_del_missing(self):
 
2214
        self.failIf('NON_EXISTANT_ENV_VAR' in os.environ)
 
2215
        def check_environment():
 
2216
            self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
 
2217
        self.check_popen_state = check_environment
 
2218
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2219
            env_changes={'NON_EXISTANT_ENV_VAR':None})
 
2220
 
 
2221
    def test_working_dir(self):
 
2222
        """Test that we can specify the working dir for the child"""
 
2223
        orig_getcwd = osutils.getcwd
 
2224
        orig_chdir = os.chdir
 
2225
        chdirs = []
 
2226
        def chdir(path):
 
2227
            chdirs.append(path)
 
2228
        os.chdir = chdir
 
2229
        try:
 
2230
            def getcwd():
 
2231
                return 'current'
 
2232
            osutils.getcwd = getcwd
 
2233
            try:
 
2234
                self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2235
                    working_dir='foo')
 
2236
            finally:
 
2237
                osutils.getcwd = orig_getcwd
 
2238
        finally:
 
2239
            os.chdir = orig_chdir
 
2240
        self.assertEqual(['foo', 'current'], chdirs)
 
2241
 
 
2242
 
 
2243
class TestBzrSubprocess(tests.TestCaseWithTransport):
 
2244
 
 
2245
    def test_start_and_stop_bzr_subprocess(self):
 
2246
        """We can start and perform other test actions while that process is
 
2247
        still alive.
 
2248
        """
 
2249
        process = self.start_bzr_subprocess(['--version'])
 
2250
        result = self.finish_bzr_subprocess(process)
 
2251
        self.assertContainsRe(result[0], 'is free software')
 
2252
        self.assertEqual('', result[1])
 
2253
 
 
2254
    def test_start_and_stop_bzr_subprocess_with_error(self):
 
2255
        """finish_bzr_subprocess allows specification of the desired exit code.
 
2256
        """
 
2257
        process = self.start_bzr_subprocess(['--versionn'])
 
2258
        result = self.finish_bzr_subprocess(process, retcode=3)
 
2259
        self.assertEqual('', result[0])
 
2260
        self.assertContainsRe(result[1], 'unknown command')
 
2261
 
 
2262
    def test_start_and_stop_bzr_subprocess_ignoring_retcode(self):
 
2263
        """finish_bzr_subprocess allows the exit code to be ignored."""
 
2264
        process = self.start_bzr_subprocess(['--versionn'])
 
2265
        result = self.finish_bzr_subprocess(process, retcode=None)
 
2266
        self.assertEqual('', result[0])
 
2267
        self.assertContainsRe(result[1], 'unknown command')
 
2268
 
 
2269
    def test_start_and_stop_bzr_subprocess_with_unexpected_retcode(self):
 
2270
        """finish_bzr_subprocess raises self.failureException if the retcode is
 
2271
        not the expected one.
 
2272
        """
 
2273
        process = self.start_bzr_subprocess(['--versionn'])
 
2274
        self.assertRaises(self.failureException, self.finish_bzr_subprocess,
 
2275
                          process)
 
2276
 
 
2277
    def test_start_and_stop_bzr_subprocess_send_signal(self):
 
2278
        """finish_bzr_subprocess raises self.failureException if the retcode is
 
2279
        not the expected one.
 
2280
        """
 
2281
        process = self.start_bzr_subprocess(['wait-until-signalled'],
 
2282
                                            skip_if_plan_to_signal=True)
 
2283
        self.assertEqual('running\n', process.stdout.readline())
 
2284
        result = self.finish_bzr_subprocess(process, send_signal=signal.SIGINT,
 
2285
                                            retcode=3)
 
2286
        self.assertEqual('', result[0])
 
2287
        self.assertEqual('bzr: interrupted\n', result[1])
 
2288
 
 
2289
    def test_start_and_stop_working_dir(self):
 
2290
        cwd = osutils.getcwd()
 
2291
        self.make_branch_and_tree('one')
 
2292
        process = self.start_bzr_subprocess(['root'], working_dir='one')
 
2293
        result = self.finish_bzr_subprocess(process, universal_newlines=True)
 
2294
        self.assertEndsWith(result[0], 'one\n')
 
2295
        self.assertEqual('', result[1])
 
2296
 
1808
2297
 
1809
2298
class TestKnownFailure(tests.TestCase):
1810
2299
 
1875
2364
        tests.TestCase.setUp(self)
1876
2365
        self.suite = TestUtil.TestSuite()
1877
2366
        self.loader = TestUtil.TestLoader()
1878
 
        self.suite.addTest(self.loader.loadTestsFromModuleNames([
1879
 
            'bzrlib.tests.test_selftest']))
 
2367
        self.suite.addTest(self.loader.loadTestsFromModule(
 
2368
            sys.modules['bzrlib.tests.test_selftest']))
1880
2369
        self.all_names = _test_ids(self.suite)
1881
2370
 
1882
2371
    def test_condition_id_re(self):
2193
2682
class TestTestSuite(tests.TestCase):
2194
2683
 
2195
2684
    def test_test_suite(self):
2196
 
        # This test is slow, so we do a single test with one test in each
2197
 
        # category
 
2685
        # This test is slow - it loads the entire test suite to operate, so we
 
2686
        # do a single test with one test in each category
2198
2687
        test_list = [
2199
2688
            # testmod_names
2200
2689
            'bzrlib.tests.blackbox.test_branch.TestBranch.test_branch',
2210
2699
        self.assertEquals(test_list, _test_ids(suite))
2211
2700
 
2212
2701
    def test_test_suite_list_and_start(self):
 
2702
        # We cannot test this at the same time as the main load, because we want
 
2703
        # to know that starting_with == None works. So a second full load is
 
2704
        # incurred.
2213
2705
        test_list = ['bzrlib.tests.test_selftest.TestTestSuite.test_test_suite']
2214
2706
        suite = tests.test_suite(test_list,
2215
2707
                                 ['bzrlib.tests.test_selftest.TestTestSuite'])
2360
2852
                return tests.ExtendedTestResult(self.stream, self.descriptions,
2361
2853
                                                self.verbosity)
2362
2854
        tests.run_suite(suite, runner_class=MyRunner, stream=StringIO())
2363
 
        self.assertEqual(calls, [suite])
 
2855
        self.assertLength(1, calls)
2364
2856
 
2365
2857
    def test_done(self):
2366
2858
        """run_suite should call result.done()"""