1
# Copyright (C) 2005-2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Testing framework extensions"""
19
# TODO: Perhaps there should be an API to find out if bzr running under the
20
# test suite -- some plugins might want to avoid making intrusive changes if
21
# this is the case. However, we want behaviour under to test to diverge as
22
# little as possible, so this should be used rarely if it's added at all.
23
# (Suggestion from j-a-meinel, 2005-11-24)
25
# NOTE: Some classes in here use camelCaseNaming() rather than
26
# underscore_naming(). That's for consistency with unittest; it's not the
27
# general style of bzrlib. Please continue that consistency when adding e.g.
28
# new assertFoo() methods.
33
from cStringIO import StringIO
56
# nb: check this before importing anything else from within it
57
_testtools_version = getattr(testtools, '__version__', ())
58
if _testtools_version < (0, 9, 5):
59
raise ImportError("need at least testtools 0.9.5: %s is %r"
60
% (testtools.__file__, _testtools_version))
61
from testtools import content
68
commands as _mod_commands,
77
plugin as _mod_plugin,
84
transport as _mod_transport,
90
# lsprof not available
92
from bzrlib.smart import client, request
93
from bzrlib.transport import (
97
from bzrlib.tests import (
102
from bzrlib.ui import NullProgressView
103
from bzrlib.ui.text import TextUIFactory
105
# Mark this python module as being part of the implementation
106
# of unittest: this gives us better tracebacks where the last
107
# shown frame is the test code, not our assertXYZ.
110
default_transport = test_server.LocalURLServer
113
_unitialized_attr = object()
114
"""A sentinel needed to act as a default value in a method signature."""
117
# Subunit result codes, defined here to prevent a hard dependency on subunit.
121
# These are intentionally brought into this namespace. That way plugins, etc
122
# can just "from bzrlib.tests import TestCase, TestLoader, etc"
123
TestSuite = TestUtil.TestSuite
124
TestLoader = TestUtil.TestLoader
126
# Tests should run in a clean and clearly defined environment. The goal is to
127
# keep them isolated from the running environment as mush as possible. The test
128
# framework ensures the variables defined below are set (or deleted if the
129
# value is None) before a test is run and reset to their original value after
130
# the test is run. Generally if some code depends on an environment variable,
131
# the tests should start without this variable in the environment. There are a
132
# few exceptions but you shouldn't violate this rule lightly.
136
# bzr now uses the Win32 API and doesn't rely on APPDATA, but the
137
# tests do check our impls match APPDATA
138
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
142
'BZREMAIL': None, # may still be present in the environment
143
'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
144
'BZR_PROGRESS_BAR': None,
146
'BZR_PLUGIN_PATH': None,
147
'BZR_DISABLE_PLUGINS': None,
148
'BZR_PLUGINS_AT': None,
149
'BZR_CONCURRENCY': None,
150
# Make sure that any text ui tests are consistent regardless of
151
# the environment the test case is run in; you may want tests that
152
# test other combinations. 'dumb' is a reasonable guess for tests
153
# going to a pipe or a StringIO.
159
'SSH_AUTH_SOCK': None,
169
# Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
170
# least. If you do (care), please update this comment
174
'BZR_REMOTE_PATH': None,
175
# Generally speaking, we don't want apport reporting on crashes in
176
# the test envirnoment unless we're specifically testing apport,
177
# so that it doesn't leak into the real system environment. We
178
# use an env var so it propagates to subprocesses.
179
'APPORT_DISABLE': '1',
183
def override_os_environ(test, env=None):
184
"""Modify os.environ keeping a copy.
186
:param test: A test instance
188
:param env: A dict containing variable definitions to be installed
191
env = isolated_environ
192
test._original_os_environ = dict([(var, value)
193
for var, value in os.environ.iteritems()])
194
for var, value in env.iteritems():
195
osutils.set_or_unset_env(var, value)
196
if var not in test._original_os_environ:
197
# The var is new, add it with a value of None, so
198
# restore_os_environ will delete it
199
test._original_os_environ[var] = None
202
def restore_os_environ(test):
203
"""Restore os.environ to its original state.
205
:param test: A test instance previously passed to override_os_environ.
207
for var, value in test._original_os_environ.iteritems():
208
# Restore the original value (or delete it if the value has been set to
209
# None in override_os_environ).
210
osutils.set_or_unset_env(var, value)
213
class ExtendedTestResult(testtools.TextTestResult):
214
"""Accepts, reports and accumulates the results of running tests.
216
Compared to the unittest version this class adds support for
217
profiling, benchmarking, stopping as soon as a test fails, and
218
skipping tests. There are further-specialized subclasses for
219
different types of display.
221
When a test finishes, in whatever way, it calls one of the addSuccess,
222
addFailure or addError classes. These in turn may redirect to a more
223
specific case for the special test results supported by our extended
226
Note that just one of these objects is fed the results from many tests.
231
def __init__(self, stream, descriptions, verbosity,
235
"""Construct new TestResult.
237
:param bench_history: Optionally, a writable file object to accumulate
240
testtools.TextTestResult.__init__(self, stream)
241
if bench_history is not None:
242
from bzrlib.version import _get_bzr_source_tree
243
src_tree = _get_bzr_source_tree()
246
revision_id = src_tree.get_parent_ids()[0]
248
# XXX: if this is a brand new tree, do the same as if there
252
# XXX: If there's no branch, what should we do?
254
bench_history.write("--date %s %s\n" % (time.time(), revision_id))
255
self._bench_history = bench_history
256
self.ui = ui.ui_factory
259
self.failure_count = 0
260
self.known_failure_count = 0
262
self.not_applicable_count = 0
263
self.unsupported = {}
265
self._overall_start_time = time.time()
266
self._strict = strict
267
self._first_thread_leaker_id = None
268
self._tests_leaking_threads_count = 0
269
self._traceback_from_test = None
271
def stopTestRun(self):
274
stopTime = time.time()
275
timeTaken = stopTime - self.startTime
276
# GZ 2010-07-19: Seems testtools has no printErrors method, and though
277
# the parent class method is similar have to duplicate
278
self._show_list('ERROR', self.errors)
279
self._show_list('FAIL', self.failures)
280
self.stream.write(self.sep2)
281
self.stream.write("%s %d test%s in %.3fs\n\n" % (actionTaken,
282
run, run != 1 and "s" or "", timeTaken))
283
if not self.wasSuccessful():
284
self.stream.write("FAILED (")
285
failed, errored = map(len, (self.failures, self.errors))
287
self.stream.write("failures=%d" % failed)
289
if failed: self.stream.write(", ")
290
self.stream.write("errors=%d" % errored)
291
if self.known_failure_count:
292
if failed or errored: self.stream.write(", ")
293
self.stream.write("known_failure_count=%d" %
294
self.known_failure_count)
295
self.stream.write(")\n")
297
if self.known_failure_count:
298
self.stream.write("OK (known_failures=%d)\n" %
299
self.known_failure_count)
301
self.stream.write("OK\n")
302
if self.skip_count > 0:
303
skipped = self.skip_count
304
self.stream.write('%d test%s skipped\n' %
305
(skipped, skipped != 1 and "s" or ""))
307
for feature, count in sorted(self.unsupported.items()):
308
self.stream.write("Missing feature '%s' skipped %d tests.\n" %
311
ok = self.wasStrictlySuccessful()
313
ok = self.wasSuccessful()
314
if self._first_thread_leaker_id:
316
'%s is leaking threads among %d leaking tests.\n' % (
317
self._first_thread_leaker_id,
318
self._tests_leaking_threads_count))
319
# We don't report the main thread as an active one.
321
'%d non-main threads were left active in the end.\n'
322
% (len(self._active_threads) - 1))
324
def getDescription(self, test):
327
def _extractBenchmarkTime(self, testCase, details=None):
328
"""Add a benchmark time for the current test case."""
329
if details and 'benchtime' in details:
330
return float(''.join(details['benchtime'].iter_bytes()))
331
return getattr(testCase, "_benchtime", None)
333
def _elapsedTestTimeString(self):
334
"""Return a time string for the overall time the current test has taken."""
335
return self._formatTime(self._delta_to_float(
336
self._now() - self._start_datetime))
338
def _testTimeString(self, testCase):
339
benchmark_time = self._extractBenchmarkTime(testCase)
340
if benchmark_time is not None:
341
return self._formatTime(benchmark_time) + "*"
343
return self._elapsedTestTimeString()
345
def _formatTime(self, seconds):
346
"""Format seconds as milliseconds with leading spaces."""
347
# some benchmarks can take thousands of seconds to run, so we need 8
349
return "%8dms" % (1000 * seconds)
351
def _shortened_test_description(self, test):
353
what = re.sub(r'^bzrlib\.tests\.', '', what)
356
# GZ 2010-10-04: Cloned tests may end up harmlessly calling this method
357
# multiple times in a row, because the handler is added for
358
# each test but the container list is shared between cases.
359
# See lp:498869 lp:625574 and lp:637725 for background.
360
def _record_traceback_from_test(self, exc_info):
361
"""Store the traceback from passed exc_info tuple till"""
362
self._traceback_from_test = exc_info[2]
364
def startTest(self, test):
365
super(ExtendedTestResult, self).startTest(test)
369
self.report_test_start(test)
370
test.number = self.count
371
self._recordTestStartTime()
372
# Make testtools cases give us the real traceback on failure
373
addOnException = getattr(test, "addOnException", None)
374
if addOnException is not None:
375
addOnException(self._record_traceback_from_test)
376
# Only check for thread leaks on bzrlib derived test cases
377
if isinstance(test, TestCase):
378
test.addCleanup(self._check_leaked_threads, test)
380
def stopTest(self, test):
381
super(ExtendedTestResult, self).stopTest(test)
382
# Manually break cycles, means touching various private things but hey
383
getDetails = getattr(test, "getDetails", None)
384
if getDetails is not None:
386
type_equality_funcs = getattr(test, "_type_equality_funcs", None)
387
if type_equality_funcs is not None:
388
type_equality_funcs.clear()
389
self._traceback_from_test = None
391
def startTests(self):
392
self.report_tests_starting()
393
self._active_threads = threading.enumerate()
395
def _check_leaked_threads(self, test):
396
"""See if any threads have leaked since last call
398
A sample of live threads is stored in the _active_threads attribute,
399
when this method runs it compares the current live threads and any not
400
in the previous sample are treated as having leaked.
402
now_active_threads = set(threading.enumerate())
403
threads_leaked = now_active_threads.difference(self._active_threads)
405
self._report_thread_leak(test, threads_leaked, now_active_threads)
406
self._tests_leaking_threads_count += 1
407
if self._first_thread_leaker_id is None:
408
self._first_thread_leaker_id = test.id()
409
self._active_threads = now_active_threads
411
def _recordTestStartTime(self):
412
"""Record that a test has started."""
413
self._start_datetime = self._now()
415
def addError(self, test, err):
416
"""Tell result that test finished with an error.
418
Called from the TestCase run() method when the test
419
fails with an unexpected error.
421
self._post_mortem(self._traceback_from_test)
422
super(ExtendedTestResult, self).addError(test, err)
423
self.error_count += 1
424
self.report_error(test, err)
428
def addFailure(self, test, err):
429
"""Tell result that test failed.
431
Called from the TestCase run() method when the test
432
fails because e.g. an assert() method failed.
434
self._post_mortem(self._traceback_from_test)
435
super(ExtendedTestResult, self).addFailure(test, err)
436
self.failure_count += 1
437
self.report_failure(test, err)
441
def addSuccess(self, test, details=None):
442
"""Tell result that test completed successfully.
444
Called from the TestCase run()
446
if self._bench_history is not None:
447
benchmark_time = self._extractBenchmarkTime(test, details)
448
if benchmark_time is not None:
449
self._bench_history.write("%s %s\n" % (
450
self._formatTime(benchmark_time),
452
self.report_success(test)
453
super(ExtendedTestResult, self).addSuccess(test)
454
test._log_contents = ''
456
def addExpectedFailure(self, test, err):
457
self.known_failure_count += 1
458
self.report_known_failure(test, err)
460
def addUnexpectedSuccess(self, test, details=None):
461
"""Tell result the test unexpectedly passed, counting as a failure
463
When the minimum version of testtools required becomes 0.9.8 this
464
can be updated to use the new handling there.
466
super(ExtendedTestResult, self).addFailure(test, details=details)
467
self.failure_count += 1
468
self.report_unexpected_success(test,
469
"".join(details["reason"].iter_text()))
473
def addNotSupported(self, test, feature):
474
"""The test will not be run because of a missing feature.
476
# this can be called in two different ways: it may be that the
477
# test started running, and then raised (through requireFeature)
478
# UnavailableFeature. Alternatively this method can be called
479
# while probing for features before running the test code proper; in
480
# that case we will see startTest and stopTest, but the test will
481
# never actually run.
482
self.unsupported.setdefault(str(feature), 0)
483
self.unsupported[str(feature)] += 1
484
self.report_unsupported(test, feature)
486
def addSkip(self, test, reason):
487
"""A test has not run for 'reason'."""
489
self.report_skip(test, reason)
491
def addNotApplicable(self, test, reason):
492
self.not_applicable_count += 1
493
self.report_not_applicable(test, reason)
495
def _post_mortem(self, tb=None):
496
"""Start a PDB post mortem session."""
497
if os.environ.get('BZR_TEST_PDB', None):
501
def progress(self, offset, whence):
502
"""The test is adjusting the count of tests to run."""
503
if whence == SUBUNIT_SEEK_SET:
504
self.num_tests = offset
505
elif whence == SUBUNIT_SEEK_CUR:
506
self.num_tests += offset
508
raise errors.BzrError("Unknown whence %r" % whence)
510
def report_tests_starting(self):
511
"""Display information before the test run begins"""
512
if getattr(sys, 'frozen', None) is None:
513
bzr_path = osutils.realpath(sys.argv[0])
515
bzr_path = sys.executable
517
'bzr selftest: %s\n' % (bzr_path,))
520
bzrlib.__path__[0],))
522
' bzr-%s python-%s %s\n' % (
523
bzrlib.version_string,
524
bzrlib._format_version_tuple(sys.version_info),
525
platform.platform(aliased=1),
527
self.stream.write('\n')
529
def report_test_start(self, test):
530
"""Display information on the test just about to be run"""
532
def _report_thread_leak(self, test, leaked_threads, active_threads):
533
"""Display information on a test that leaked one or more threads"""
534
# GZ 2010-09-09: A leak summary reported separately from the general
535
# thread debugging would be nice. Tests under subunit
536
# need something not using stream, perhaps adding a
537
# testtools details object would be fitting.
538
if 'threads' in selftest_debug_flags:
539
self.stream.write('%s is leaking, active is now %d\n' %
540
(test.id(), len(active_threads)))
542
def startTestRun(self):
543
self.startTime = time.time()
545
def report_success(self, test):
548
def wasStrictlySuccessful(self):
549
if self.unsupported or self.known_failure_count:
551
return self.wasSuccessful()
554
class TextTestResult(ExtendedTestResult):
555
"""Displays progress and results of tests in text form"""
557
def __init__(self, stream, descriptions, verbosity,
562
ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
563
bench_history, strict)
564
# We no longer pass them around, but just rely on the UIFactory stack
567
warnings.warn("Passing pb to TextTestResult is deprecated")
568
self.pb = self.ui.nested_progress_bar()
569
self.pb.show_pct = False
570
self.pb.show_spinner = False
571
self.pb.show_eta = False,
572
self.pb.show_count = False
573
self.pb.show_bar = False
574
self.pb.update_latency = 0
575
self.pb.show_transport_activity = False
577
def stopTestRun(self):
578
# called when the tests that are going to run have run
581
super(TextTestResult, self).stopTestRun()
583
def report_tests_starting(self):
584
super(TextTestResult, self).report_tests_starting()
585
self.pb.update('[test 0/%d] Starting' % (self.num_tests))
587
def _progress_prefix_text(self):
588
# the longer this text, the less space we have to show the test
590
a = '[%d' % self.count # total that have been run
591
# tests skipped as known not to be relevant are not important enough
593
## if self.skip_count:
594
## a += ', %d skip' % self.skip_count
595
## if self.known_failure_count:
596
## a += '+%dX' % self.known_failure_count
598
a +='/%d' % self.num_tests
600
runtime = time.time() - self._overall_start_time
602
a += '%dm%ds' % (runtime / 60, runtime % 60)
605
total_fail_count = self.error_count + self.failure_count
607
a += ', %d failed' % total_fail_count
608
# if self.unsupported:
609
# a += ', %d missing' % len(self.unsupported)
613
def report_test_start(self, test):
615
self._progress_prefix_text()
617
+ self._shortened_test_description(test))
619
def _test_description(self, test):
620
return self._shortened_test_description(test)
622
def report_error(self, test, err):
623
self.stream.write('ERROR: %s\n %s\n' % (
624
self._test_description(test),
628
def report_failure(self, test, err):
629
self.stream.write('FAIL: %s\n %s\n' % (
630
self._test_description(test),
634
def report_known_failure(self, test, err):
637
def report_unexpected_success(self, test, reason):
638
self.stream.write('FAIL: %s\n %s: %s\n' % (
639
self._test_description(test),
640
"Unexpected success. Should have failed",
644
def report_skip(self, test, reason):
647
def report_not_applicable(self, test, reason):
650
def report_unsupported(self, test, feature):
651
"""test cannot be run because feature is missing."""
654
class VerboseTestResult(ExtendedTestResult):
655
"""Produce long output, with one line per test run plus times"""
657
def _ellipsize_to_right(self, a_string, final_width):
658
"""Truncate and pad a string, keeping the right hand side"""
659
if len(a_string) > final_width:
660
result = '...' + a_string[3-final_width:]
663
return result.ljust(final_width)
665
def report_tests_starting(self):
666
self.stream.write('running %d tests...\n' % self.num_tests)
667
super(VerboseTestResult, self).report_tests_starting()
669
def report_test_start(self, test):
670
name = self._shortened_test_description(test)
671
width = osutils.terminal_width()
672
if width is not None:
673
# width needs space for 6 char status, plus 1 for slash, plus an
674
# 11-char time string, plus a trailing blank
675
# when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on
677
self.stream.write(self._ellipsize_to_right(name, width-18))
679
self.stream.write(name)
682
def _error_summary(self, err):
684
return '%s%s' % (indent, err[1])
686
def report_error(self, test, err):
687
self.stream.write('ERROR %s\n%s\n'
688
% (self._testTimeString(test),
689
self._error_summary(err)))
691
def report_failure(self, test, err):
692
self.stream.write(' FAIL %s\n%s\n'
693
% (self._testTimeString(test),
694
self._error_summary(err)))
696
def report_known_failure(self, test, err):
697
self.stream.write('XFAIL %s\n%s\n'
698
% (self._testTimeString(test),
699
self._error_summary(err)))
701
def report_unexpected_success(self, test, reason):
702
self.stream.write(' FAIL %s\n%s: %s\n'
703
% (self._testTimeString(test),
704
"Unexpected success. Should have failed",
707
def report_success(self, test):
708
self.stream.write(' OK %s\n' % self._testTimeString(test))
709
for bench_called, stats in getattr(test, '_benchcalls', []):
710
self.stream.write('LSProf output for %s(%s, %s)\n' % bench_called)
711
stats.pprint(file=self.stream)
712
# flush the stream so that we get smooth output. This verbose mode is
713
# used to show the output in PQM.
716
def report_skip(self, test, reason):
717
self.stream.write(' SKIP %s\n%s\n'
718
% (self._testTimeString(test), reason))
720
def report_not_applicable(self, test, reason):
721
self.stream.write(' N/A %s\n %s\n'
722
% (self._testTimeString(test), reason))
724
def report_unsupported(self, test, feature):
725
"""test cannot be run because feature is missing."""
726
self.stream.write("NODEP %s\n The feature '%s' is not available.\n"
727
%(self._testTimeString(test), feature))
730
class TextTestRunner(object):
731
stop_on_failure = False
739
result_decorators=None,
741
"""Create a TextTestRunner.
743
:param result_decorators: An optional list of decorators to apply
744
to the result object being used by the runner. Decorators are
745
applied left to right - the first element in the list is the
748
# stream may know claim to know to write unicode strings, but in older
749
# pythons this goes sufficiently wrong that it is a bad idea. (
750
# specifically a built in file with encoding 'UTF-8' will still try
751
# to encode using ascii.
752
new_encoding = osutils.get_terminal_encoding()
753
codec = codecs.lookup(new_encoding)
754
if type(codec) is tuple:
758
encode = codec.encode
759
# GZ 2010-09-08: Really we don't want to be writing arbitrary bytes,
760
# so should swap to the plain codecs.StreamWriter
761
stream = osutils.UnicodeOrBytesToBytesWriter(encode, stream,
763
stream.encoding = new_encoding
765
self.descriptions = descriptions
766
self.verbosity = verbosity
767
self._bench_history = bench_history
768
self._strict = strict
769
self._result_decorators = result_decorators or []
772
"Run the given test case or test suite."
773
if self.verbosity == 1:
774
result_class = TextTestResult
775
elif self.verbosity >= 2:
776
result_class = VerboseTestResult
777
original_result = result_class(self.stream,
780
bench_history=self._bench_history,
783
# Signal to result objects that look at stop early policy to stop,
784
original_result.stop_early = self.stop_on_failure
785
result = original_result
786
for decorator in self._result_decorators:
787
result = decorator(result)
788
result.stop_early = self.stop_on_failure
789
result.startTestRun()
794
# higher level code uses our extended protocol to determine
795
# what exit code to give.
796
return original_result
799
def iter_suite_tests(suite):
800
"""Return all tests in a suite, recursing through nested suites"""
801
if isinstance(suite, unittest.TestCase):
803
elif isinstance(suite, unittest.TestSuite):
805
for r in iter_suite_tests(item):
808
raise Exception('unknown type %r for object %r'
809
% (type(suite), suite))
812
TestSkipped = testtools.testcase.TestSkipped
815
class TestNotApplicable(TestSkipped):
816
"""A test is not applicable to the situation where it was run.
818
This is only normally raised by parameterized tests, if they find that
819
the instance they're constructed upon does not support one aspect
824
# traceback._some_str fails to format exceptions that have the default
825
# __str__ which does an implicit ascii conversion. However, repr() on those
826
# objects works, for all that its not quite what the doctor may have ordered.
827
def _clever_some_str(value):
832
return repr(value).replace('\\n', '\n')
834
return '<unprintable %s object>' % type(value).__name__
836
traceback._some_str = _clever_some_str
839
# deprecated - use self.knownFailure(), or self.expectFailure.
840
KnownFailure = testtools.testcase._ExpectedFailure
843
class UnavailableFeature(Exception):
844
"""A feature required for this test was not available.
846
This can be considered a specialised form of SkippedTest.
848
The feature should be used to construct the exception.
852
class StringIOWrapper(object):
853
"""A wrapper around cStringIO which just adds an encoding attribute.
855
Internally we can check sys.stdout to see what the output encoding
856
should be. However, cStringIO has no encoding attribute that we can
857
set. So we wrap it instead.
862
def __init__(self, s=None):
864
self.__dict__['_cstring'] = StringIO(s)
866
self.__dict__['_cstring'] = StringIO()
868
def __getattr__(self, name, getattr=getattr):
869
return getattr(self.__dict__['_cstring'], name)
871
def __setattr__(self, name, val):
872
if name == 'encoding':
873
self.__dict__['encoding'] = val
875
return setattr(self._cstring, name, val)
878
class TestUIFactory(TextUIFactory):
879
"""A UI Factory for testing.
881
Hide the progress bar but emit note()s.
883
Allows get_password to be tested without real tty attached.
885
See also CannedInputUIFactory which lets you provide programmatic input in
888
# TODO: Capture progress events at the model level and allow them to be
889
# observed by tests that care.
891
# XXX: Should probably unify more with CannedInputUIFactory or a
892
# particular configuration of TextUIFactory, or otherwise have a clearer
893
# idea of how they're supposed to be different.
894
# See https://bugs.launchpad.net/bzr/+bug/408213
896
def __init__(self, stdout=None, stderr=None, stdin=None):
897
if stdin is not None:
898
# We use a StringIOWrapper to be able to test various
899
# encodings, but the user is still responsible to
900
# encode the string and to set the encoding attribute
901
# of StringIOWrapper.
902
stdin = StringIOWrapper(stdin)
903
super(TestUIFactory, self).__init__(stdin, stdout, stderr)
905
def get_non_echoed_password(self):
906
"""Get password from stdin without trying to handle the echo mode"""
907
password = self.stdin.readline()
910
if password[-1] == '\n':
911
password = password[:-1]
914
def make_progress_view(self):
915
return NullProgressView()
918
def isolated_doctest_setUp(test):
919
override_os_environ(test)
922
def isolated_doctest_tearDown(test):
923
restore_os_environ(test)
926
def IsolatedDocTestSuite(*args, **kwargs):
927
"""Overrides doctest.DocTestSuite to handle isolation.
929
The method is really a factory and users are expected to use it as such.
932
kwargs['setUp'] = isolated_doctest_setUp
933
kwargs['tearDown'] = isolated_doctest_tearDown
934
return doctest.DocTestSuite(*args, **kwargs)
937
class TestCase(testtools.TestCase):
938
"""Base class for bzr unit tests.
940
Tests that need access to disk resources should subclass
941
TestCaseInTempDir not TestCase.
943
Error and debug log messages are redirected from their usual
944
location into a temporary file, the contents of which can be
945
retrieved by _get_log(). We use a real OS file, not an in-memory object,
946
so that it can also capture file IO. When the test completes this file
947
is read into memory and removed from disk.
949
There are also convenience functions to invoke bzr's command-line
950
routine, and to build and check bzr trees.
952
In addition to the usual method of overriding tearDown(), this class also
953
allows subclasses to register cleanup functions via addCleanup, which are
954
run in order as the object is torn down. It's less likely this will be
955
accidentally overlooked.
959
# record lsprof data when performing benchmark calls.
960
_gather_lsprof_in_benchmarks = False
962
def __init__(self, methodName='testMethod'):
963
super(TestCase, self).__init__(methodName)
964
self._directory_isolation = True
965
self.exception_handlers.insert(0,
966
(UnavailableFeature, self._do_unsupported_or_skip))
967
self.exception_handlers.insert(0,
968
(TestNotApplicable, self._do_not_applicable))
971
super(TestCase, self).setUp()
972
for feature in getattr(self, '_test_needs_features', []):
973
self.requireFeature(feature)
974
self._cleanEnvironment()
977
self._benchcalls = []
978
self._benchtime = None
980
self._track_transports()
982
self._clear_debug_flags()
983
# Isolate global verbosity level, to make sure it's reproducible
984
# between tests. We should get rid of this altogether: bug 656694. --
986
self.overrideAttr(bzrlib.trace, '_verbosity_level', 0)
987
# Isolate config option expansion until its default value for bzrlib is
988
# settled on or a the FIXME associated with _get_expand_default_value
989
# is addressed -- vila 20110219
990
self.overrideAttr(config, '_expand_default_value', None)
991
self._log_files = set()
992
# Each key in the ``_counters`` dict holds a value for a different
993
# counter. When the test ends, addDetail() should be used to output the
994
# counter values. This happens in install_counter_hook().
996
if 'config_stats' in selftest_debug_flags:
997
self._install_config_stats_hooks()
1002
pdb.Pdb().set_trace(sys._getframe().f_back)
1004
def discardDetail(self, name):
1005
"""Extend the addDetail, getDetails api so we can remove a detail.
1007
eg. bzr always adds the 'log' detail at startup, but we don't want to
1008
include it for skipped, xfail, etc tests.
1010
It is safe to call this for a detail that doesn't exist, in case this
1011
gets called multiple times.
1013
# We cheat. details is stored in __details which means we shouldn't
1014
# touch it. but getDetails() returns the dict directly, so we can
1016
details = self.getDetails()
1020
def install_counter_hook(self, hooks, name, counter_name=None):
1021
"""Install a counting hook.
1023
Any hook can be counted as long as it doesn't need to return a value.
1025
:param hooks: Where the hook should be installed.
1027
:param name: The hook name that will be counted.
1029
:param counter_name: The counter identifier in ``_counters``, defaults
1032
_counters = self._counters # Avoid closing over self
1033
if counter_name is None:
1035
if _counters.has_key(counter_name):
1036
raise AssertionError('%s is already used as a counter name'
1038
_counters[counter_name] = 0
1039
self.addDetail(counter_name, content.Content(content.UTF8_TEXT,
1040
lambda: ['%d' % (_counters[counter_name],)]))
1041
def increment_counter(*args, **kwargs):
1042
_counters[counter_name] += 1
1043
label = 'count %s calls' % (counter_name,)
1044
hooks.install_named_hook(name, increment_counter, label)
1045
self.addCleanup(hooks.uninstall_named_hook, name, label)
1047
def _install_config_stats_hooks(self):
1048
"""Install config hooks to count hook calls.
1051
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1052
self.install_counter_hook(config.ConfigHooks, hook_name,
1053
'config.%s' % (hook_name,))
1055
# The OldConfigHooks are private and need special handling to protect
1056
# against recursive tests (tests that run other tests), so we just do
1057
# manually what registering them into _builtin_known_hooks will provide
1059
self.overrideAttr(config, 'OldConfigHooks', config._OldConfigHooks())
1060
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1061
self.install_counter_hook(config.OldConfigHooks, hook_name,
1062
'old_config.%s' % (hook_name,))
1064
def _clear_debug_flags(self):
1065
"""Prevent externally set debug flags affecting tests.
1067
Tests that want to use debug flags can just set them in the
1068
debug_flags set during setup/teardown.
1070
# Start with a copy of the current debug flags we can safely modify.
1071
self.overrideAttr(debug, 'debug_flags', set(debug.debug_flags))
1072
if 'allow_debug' not in selftest_debug_flags:
1073
debug.debug_flags.clear()
1074
if 'disable_lock_checks' not in selftest_debug_flags:
1075
debug.debug_flags.add('strict_locks')
1077
def _clear_hooks(self):
1078
# prevent hooks affecting tests
1079
known_hooks = hooks.known_hooks
1080
self._preserved_hooks = {}
1081
for key, (parent, name) in known_hooks.iter_parent_objects():
1082
current_hooks = getattr(parent, name)
1083
self._preserved_hooks[parent] = (name, current_hooks)
1084
self._preserved_lazy_hooks = hooks._lazy_hooks
1085
hooks._lazy_hooks = {}
1086
self.addCleanup(self._restoreHooks)
1087
for key, (parent, name) in known_hooks.iter_parent_objects():
1088
factory = known_hooks.get(key)
1089
setattr(parent, name, factory())
1090
# this hook should always be installed
1091
request._install_hook()
1093
def disable_directory_isolation(self):
1094
"""Turn off directory isolation checks."""
1095
self._directory_isolation = False
1097
def enable_directory_isolation(self):
1098
"""Enable directory isolation checks."""
1099
self._directory_isolation = True
1101
def _silenceUI(self):
1102
"""Turn off UI for duration of test"""
1103
# by default the UI is off; tests can turn it on if they want it.
1104
self.overrideAttr(ui, 'ui_factory', ui.SilentUIFactory())
1106
def _check_locks(self):
1107
"""Check that all lock take/release actions have been paired."""
1108
# We always check for mismatched locks. If a mismatch is found, we
1109
# fail unless -Edisable_lock_checks is supplied to selftest, in which
1110
# case we just print a warning.
1112
acquired_locks = [lock for action, lock in self._lock_actions
1113
if action == 'acquired']
1114
released_locks = [lock for action, lock in self._lock_actions
1115
if action == 'released']
1116
broken_locks = [lock for action, lock in self._lock_actions
1117
if action == 'broken']
1118
# trivially, given the tests for lock acquistion and release, if we
1119
# have as many in each list, it should be ok. Some lock tests also
1120
# break some locks on purpose and should be taken into account by
1121
# considering that breaking a lock is just a dirty way of releasing it.
1122
if len(acquired_locks) != (len(released_locks) + len(broken_locks)):
1124
'Different number of acquired and '
1125
'released or broken locks.\n'
1129
(acquired_locks, released_locks, broken_locks))
1130
if not self._lock_check_thorough:
1131
# Rather than fail, just warn
1132
print "Broken test %s: %s" % (self, message)
1136
def _track_locks(self):
1137
"""Track lock activity during tests."""
1138
self._lock_actions = []
1139
if 'disable_lock_checks' in selftest_debug_flags:
1140
self._lock_check_thorough = False
1142
self._lock_check_thorough = True
1144
self.addCleanup(self._check_locks)
1145
_mod_lock.Lock.hooks.install_named_hook('lock_acquired',
1146
self._lock_acquired, None)
1147
_mod_lock.Lock.hooks.install_named_hook('lock_released',
1148
self._lock_released, None)
1149
_mod_lock.Lock.hooks.install_named_hook('lock_broken',
1150
self._lock_broken, None)
1152
def _lock_acquired(self, result):
1153
self._lock_actions.append(('acquired', result))
1155
def _lock_released(self, result):
1156
self._lock_actions.append(('released', result))
1158
def _lock_broken(self, result):
1159
self._lock_actions.append(('broken', result))
1161
def permit_dir(self, name):
1162
"""Permit a directory to be used by this test. See permit_url."""
1163
name_transport = _mod_transport.get_transport(name)
1164
self.permit_url(name)
1165
self.permit_url(name_transport.base)
1167
def permit_url(self, url):
1168
"""Declare that url is an ok url to use in this test.
1170
Do this for memory transports, temporary test directory etc.
1172
Do not do this for the current working directory, /tmp, or any other
1173
preexisting non isolated url.
1175
if not url.endswith('/'):
1177
self._bzr_selftest_roots.append(url)
1179
def permit_source_tree_branch_repo(self):
1180
"""Permit the source tree bzr is running from to be opened.
1182
Some code such as bzrlib.version attempts to read from the bzr branch
1183
that bzr is executing from (if any). This method permits that directory
1184
to be used in the test suite.
1186
path = self.get_source_path()
1187
self.record_directory_isolation()
1190
workingtree.WorkingTree.open(path)
1191
except (errors.NotBranchError, errors.NoWorkingTree):
1192
raise TestSkipped('Needs a working tree of bzr sources')
1194
self.enable_directory_isolation()
1196
def _preopen_isolate_transport(self, transport):
1197
"""Check that all transport openings are done in the test work area."""
1198
while isinstance(transport, pathfilter.PathFilteringTransport):
1199
# Unwrap pathfiltered transports
1200
transport = transport.server.backing_transport.clone(
1201
transport._filter('.'))
1202
url = transport.base
1203
# ReadonlySmartTCPServer_for_testing decorates the backing transport
1204
# urls it is given by prepending readonly+. This is appropriate as the
1205
# client shouldn't know that the server is readonly (or not readonly).
1206
# We could register all servers twice, with readonly+ prepending, but
1207
# that makes for a long list; this is about the same but easier to
1209
if url.startswith('readonly+'):
1210
url = url[len('readonly+'):]
1211
self._preopen_isolate_url(url)
1213
def _preopen_isolate_url(self, url):
1214
if not self._directory_isolation:
1216
if self._directory_isolation == 'record':
1217
self._bzr_selftest_roots.append(url)
1219
# This prevents all transports, including e.g. sftp ones backed on disk
1220
# from working unless they are explicitly granted permission. We then
1221
# depend on the code that sets up test transports to check that they are
1222
# appropriately isolated and enable their use by calling
1223
# self.permit_transport()
1224
if not osutils.is_inside_any(self._bzr_selftest_roots, url):
1225
raise errors.BzrError("Attempt to escape test isolation: %r %r"
1226
% (url, self._bzr_selftest_roots))
1228
def record_directory_isolation(self):
1229
"""Gather accessed directories to permit later access.
1231
This is used for tests that access the branch bzr is running from.
1233
self._directory_isolation = "record"
1235
def start_server(self, transport_server, backing_server=None):
1236
"""Start transport_server for this test.
1238
This starts the server, registers a cleanup for it and permits the
1239
server's urls to be used.
1241
if backing_server is None:
1242
transport_server.start_server()
1244
transport_server.start_server(backing_server)
1245
self.addCleanup(transport_server.stop_server)
1246
# Obtain a real transport because if the server supplies a password, it
1247
# will be hidden from the base on the client side.
1248
t = _mod_transport.get_transport(transport_server.get_url())
1249
# Some transport servers effectively chroot the backing transport;
1250
# others like SFTPServer don't - users of the transport can walk up the
1251
# transport to read the entire backing transport. This wouldn't matter
1252
# except that the workdir tests are given - and that they expect the
1253
# server's url to point at - is one directory under the safety net. So
1254
# Branch operations into the transport will attempt to walk up one
1255
# directory. Chrooting all servers would avoid this but also mean that
1256
# we wouldn't be testing directly against non-root urls. Alternatively
1257
# getting the test framework to start the server with a backing server
1258
# at the actual safety net directory would work too, but this then
1259
# means that the self.get_url/self.get_transport methods would need
1260
# to transform all their results. On balance its cleaner to handle it
1261
# here, and permit a higher url when we have one of these transports.
1262
if t.base.endswith('/work/'):
1263
# we have safety net/test root/work
1264
t = t.clone('../..')
1265
elif isinstance(transport_server,
1266
test_server.SmartTCPServer_for_testing):
1267
# The smart server adds a path similar to work, which is traversed
1268
# up from by the client. But the server is chrooted - the actual
1269
# backing transport is not escaped from, and VFS requests to the
1270
# root will error (because they try to escape the chroot).
1272
while t2.base != t.base:
1275
self.permit_url(t.base)
1277
def _track_transports(self):
1278
"""Install checks for transport usage."""
1279
# TestCase has no safe place it can write to.
1280
self._bzr_selftest_roots = []
1281
# Currently the easiest way to be sure that nothing is going on is to
1282
# hook into bzr dir opening. This leaves a small window of error for
1283
# transport tests, but they are well known, and we can improve on this
1285
bzrdir.BzrDir.hooks.install_named_hook("pre_open",
1286
self._preopen_isolate_transport, "Check bzr directories are safe.")
1288
def _ndiff_strings(self, a, b):
1289
"""Return ndiff between two strings containing lines.
1291
A trailing newline is added if missing to make the strings
1293
if b and b[-1] != '\n':
1295
if a and a[-1] != '\n':
1297
difflines = difflib.ndiff(a.splitlines(True),
1299
linejunk=lambda x: False,
1300
charjunk=lambda x: False)
1301
return ''.join(difflines)
1303
def assertEqual(self, a, b, message=''):
1307
except UnicodeError, e:
1308
# If we can't compare without getting a UnicodeError, then
1309
# obviously they are different
1310
trace.mutter('UnicodeError: %s', e)
1313
raise AssertionError("%snot equal:\na = %s\nb = %s\n"
1315
pprint.pformat(a), pprint.pformat(b)))
1317
assertEquals = assertEqual
1319
def assertEqualDiff(self, a, b, message=None):
1320
"""Assert two texts are equal, if not raise an exception.
1322
This is intended for use with multi-line strings where it can
1323
be hard to find the differences by eye.
1325
# TODO: perhaps override assertEquals to call this for strings?
1329
message = "texts not equal:\n"
1331
message = 'first string is missing a final newline.\n'
1333
message = 'second string is missing a final newline.\n'
1334
raise AssertionError(message +
1335
self._ndiff_strings(a, b))
1337
def assertEqualMode(self, mode, mode_test):
1338
self.assertEqual(mode, mode_test,
1339
'mode mismatch %o != %o' % (mode, mode_test))
1341
def assertEqualStat(self, expected, actual):
1342
"""assert that expected and actual are the same stat result.
1344
:param expected: A stat result.
1345
:param actual: A stat result.
1346
:raises AssertionError: If the expected and actual stat values differ
1347
other than by atime.
1349
self.assertEqual(expected.st_size, actual.st_size,
1350
'st_size did not match')
1351
self.assertEqual(expected.st_mtime, actual.st_mtime,
1352
'st_mtime did not match')
1353
self.assertEqual(expected.st_ctime, actual.st_ctime,
1354
'st_ctime did not match')
1355
if sys.platform == 'win32':
1356
# On Win32 both 'dev' and 'ino' cannot be trusted. In python2.4 it
1357
# is 'dev' that varies, in python 2.5 (6?) it is st_ino that is
1358
# odd. We just force it to always be 0 to avoid any problems.
1359
self.assertEqual(0, expected.st_dev)
1360
self.assertEqual(0, actual.st_dev)
1361
self.assertEqual(0, expected.st_ino)
1362
self.assertEqual(0, actual.st_ino)
1364
self.assertEqual(expected.st_dev, actual.st_dev,
1365
'st_dev did not match')
1366
self.assertEqual(expected.st_ino, actual.st_ino,
1367
'st_ino did not match')
1368
self.assertEqual(expected.st_mode, actual.st_mode,
1369
'st_mode did not match')
1371
def assertLength(self, length, obj_with_len):
1372
"""Assert that obj_with_len is of length length."""
1373
if len(obj_with_len) != length:
1374
self.fail("Incorrect length: wanted %d, got %d for %r" % (
1375
length, len(obj_with_len), obj_with_len))
1377
def assertLogsError(self, exception_class, func, *args, **kwargs):
1378
"""Assert that `func(*args, **kwargs)` quietly logs a specific error.
1381
orig_log_exception_quietly = trace.log_exception_quietly
1384
orig_log_exception_quietly()
1385
captured.append(sys.exc_info()[1])
1386
trace.log_exception_quietly = capture
1387
func(*args, **kwargs)
1389
trace.log_exception_quietly = orig_log_exception_quietly
1390
self.assertLength(1, captured)
1392
self.assertIsInstance(err, exception_class)
1395
def assertPositive(self, val):
1396
"""Assert that val is greater than 0."""
1397
self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
1399
def assertNegative(self, val):
1400
"""Assert that val is less than 0."""
1401
self.assertTrue(val < 0, 'expected a negative value, but got %s' % val)
1403
def assertStartsWith(self, s, prefix):
1404
if not s.startswith(prefix):
1405
raise AssertionError('string %r does not start with %r' % (s, prefix))
1407
def assertEndsWith(self, s, suffix):
1408
"""Asserts that s ends with suffix."""
1409
if not s.endswith(suffix):
1410
raise AssertionError('string %r does not end with %r' % (s, suffix))
1412
def assertContainsRe(self, haystack, needle_re, flags=0):
1413
"""Assert that a contains something matching a regular expression."""
1414
if not re.search(needle_re, haystack, flags):
1415
if '\n' in haystack or len(haystack) > 60:
1416
# a long string, format it in a more readable way
1417
raise AssertionError(
1418
'pattern "%s" not found in\n"""\\\n%s"""\n'
1419
% (needle_re, haystack))
1421
raise AssertionError('pattern "%s" not found in "%s"'
1422
% (needle_re, haystack))
1424
def assertNotContainsRe(self, haystack, needle_re, flags=0):
1425
"""Assert that a does not match a regular expression"""
1426
if re.search(needle_re, haystack, flags):
1427
raise AssertionError('pattern "%s" found in "%s"'
1428
% (needle_re, haystack))
1430
def assertContainsString(self, haystack, needle):
1431
if haystack.find(needle) == -1:
1432
self.fail("string %r not found in '''%s'''" % (needle, haystack))
1434
def assertNotContainsString(self, haystack, needle):
1435
if haystack.find(needle) != -1:
1436
self.fail("string %r found in '''%s'''" % (needle, haystack))
1438
def assertSubset(self, sublist, superlist):
1439
"""Assert that every entry in sublist is present in superlist."""
1440
missing = set(sublist) - set(superlist)
1441
if len(missing) > 0:
1442
raise AssertionError("value(s) %r not present in container %r" %
1443
(missing, superlist))
1445
def assertListRaises(self, excClass, func, *args, **kwargs):
1446
"""Fail unless excClass is raised when the iterator from func is used.
1448
Many functions can return generators this makes sure
1449
to wrap them in a list() call to make sure the whole generator
1450
is run, and that the proper exception is raised.
1453
list(func(*args, **kwargs))
1457
if getattr(excClass,'__name__', None) is not None:
1458
excName = excClass.__name__
1460
excName = str(excClass)
1461
raise self.failureException, "%s not raised" % excName
1463
def assertRaises(self, excClass, callableObj, *args, **kwargs):
1464
"""Assert that a callable raises a particular exception.
1466
:param excClass: As for the except statement, this may be either an
1467
exception class, or a tuple of classes.
1468
:param callableObj: A callable, will be passed ``*args`` and
1471
Returns the exception so that you can examine it.
1474
callableObj(*args, **kwargs)
1478
if getattr(excClass,'__name__', None) is not None:
1479
excName = excClass.__name__
1482
excName = str(excClass)
1483
raise self.failureException, "%s not raised" % excName
1485
def assertIs(self, left, right, message=None):
1486
if not (left is right):
1487
if message is not None:
1488
raise AssertionError(message)
1490
raise AssertionError("%r is not %r." % (left, right))
1492
def assertIsNot(self, left, right, message=None):
1494
if message is not None:
1495
raise AssertionError(message)
1497
raise AssertionError("%r is %r." % (left, right))
1499
def assertTransportMode(self, transport, path, mode):
1500
"""Fail if a path does not have mode "mode".
1502
If modes are not supported on this transport, the assertion is ignored.
1504
if not transport._can_roundtrip_unix_modebits():
1506
path_stat = transport.stat(path)
1507
actual_mode = stat.S_IMODE(path_stat.st_mode)
1508
self.assertEqual(mode, actual_mode,
1509
'mode of %r incorrect (%s != %s)'
1510
% (path, oct(mode), oct(actual_mode)))
1512
def assertIsSameRealPath(self, path1, path2):
1513
"""Fail if path1 and path2 points to different files"""
1514
self.assertEqual(osutils.realpath(path1),
1515
osutils.realpath(path2),
1516
"apparent paths:\na = %s\nb = %s\n," % (path1, path2))
1518
def assertIsInstance(self, obj, kls, msg=None):
1519
"""Fail if obj is not an instance of kls
1521
:param msg: Supplementary message to show if the assertion fails.
1523
if not isinstance(obj, kls):
1524
m = "%r is an instance of %s rather than %s" % (
1525
obj, obj.__class__, kls)
1530
def assertFileEqual(self, content, path):
1531
"""Fail if path does not contain 'content'."""
1532
self.assertPathExists(path)
1533
f = file(path, 'rb')
1538
self.assertEqualDiff(content, s)
1540
def assertDocstring(self, expected_docstring, obj):
1541
"""Fail if obj does not have expected_docstring"""
1543
# With -OO the docstring should be None instead
1544
self.assertIs(obj.__doc__, None)
1546
self.assertEqual(expected_docstring, obj.__doc__)
1548
@symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
1549
def failUnlessExists(self, path):
1550
return self.assertPathExists(path)
1552
def assertPathExists(self, path):
1553
"""Fail unless path or paths, which may be abs or relative, exist."""
1554
if not isinstance(path, basestring):
1556
self.assertPathExists(p)
1558
self.assertTrue(osutils.lexists(path),
1559
path + " does not exist")
1561
@symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
1562
def failIfExists(self, path):
1563
return self.assertPathDoesNotExist(path)
1565
def assertPathDoesNotExist(self, path):
1566
"""Fail if path or paths, which may be abs or relative, exist."""
1567
if not isinstance(path, basestring):
1569
self.assertPathDoesNotExist(p)
1571
self.assertFalse(osutils.lexists(path),
1574
def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
1575
"""A helper for callDeprecated and applyDeprecated.
1577
:param a_callable: A callable to call.
1578
:param args: The positional arguments for the callable
1579
:param kwargs: The keyword arguments for the callable
1580
:return: A tuple (warnings, result). result is the result of calling
1581
a_callable(``*args``, ``**kwargs``).
1584
def capture_warnings(msg, cls=None, stacklevel=None):
1585
# we've hooked into a deprecation specific callpath,
1586
# only deprecations should getting sent via it.
1587
self.assertEqual(cls, DeprecationWarning)
1588
local_warnings.append(msg)
1589
original_warning_method = symbol_versioning.warn
1590
symbol_versioning.set_warning_method(capture_warnings)
1592
result = a_callable(*args, **kwargs)
1594
symbol_versioning.set_warning_method(original_warning_method)
1595
return (local_warnings, result)
1597
def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
1598
"""Call a deprecated callable without warning the user.
1600
Note that this only captures warnings raised by symbol_versioning.warn,
1601
not other callers that go direct to the warning module.
1603
To test that a deprecated method raises an error, do something like
1604
this (remember that both assertRaises and applyDeprecated delays *args
1605
and **kwargs passing)::
1607
self.assertRaises(errors.ReservedId,
1608
self.applyDeprecated,
1609
deprecated_in((1, 5, 0)),
1613
:param deprecation_format: The deprecation format that the callable
1614
should have been deprecated with. This is the same type as the
1615
parameter to deprecated_method/deprecated_function. If the
1616
callable is not deprecated with this format, an assertion error
1618
:param a_callable: A callable to call. This may be a bound method or
1619
a regular function. It will be called with ``*args`` and
1621
:param args: The positional arguments for the callable
1622
:param kwargs: The keyword arguments for the callable
1623
:return: The result of a_callable(``*args``, ``**kwargs``)
1625
call_warnings, result = self._capture_deprecation_warnings(a_callable,
1627
expected_first_warning = symbol_versioning.deprecation_string(
1628
a_callable, deprecation_format)
1629
if len(call_warnings) == 0:
1630
self.fail("No deprecation warning generated by call to %s" %
1632
self.assertEqual(expected_first_warning, call_warnings[0])
1635
def callCatchWarnings(self, fn, *args, **kw):
1636
"""Call a callable that raises python warnings.
1638
The caller's responsible for examining the returned warnings.
1640
If the callable raises an exception, the exception is not
1641
caught and propagates up to the caller. In that case, the list
1642
of warnings is not available.
1644
:returns: ([warning_object, ...], fn_result)
1646
# XXX: This is not perfect, because it completely overrides the
1647
# warnings filters, and some code may depend on suppressing particular
1648
# warnings. It's the easiest way to insulate ourselves from -Werror,
1649
# though. -- Andrew, 20071062
1651
def _catcher(message, category, filename, lineno, file=None, line=None):
1652
# despite the name, 'message' is normally(?) a Warning subclass
1654
wlist.append(message)
1655
saved_showwarning = warnings.showwarning
1656
saved_filters = warnings.filters
1658
warnings.showwarning = _catcher
1659
warnings.filters = []
1660
result = fn(*args, **kw)
1662
warnings.showwarning = saved_showwarning
1663
warnings.filters = saved_filters
1664
return wlist, result
1666
def callDeprecated(self, expected, callable, *args, **kwargs):
1667
"""Assert that a callable is deprecated in a particular way.
1669
This is a very precise test for unusual requirements. The
1670
applyDeprecated helper function is probably more suited for most tests
1671
as it allows you to simply specify the deprecation format being used
1672
and will ensure that that is issued for the function being called.
1674
Note that this only captures warnings raised by symbol_versioning.warn,
1675
not other callers that go direct to the warning module. To catch
1676
general warnings, use callCatchWarnings.
1678
:param expected: a list of the deprecation warnings expected, in order
1679
:param callable: The callable to call
1680
:param args: The positional arguments for the callable
1681
:param kwargs: The keyword arguments for the callable
1683
call_warnings, result = self._capture_deprecation_warnings(callable,
1685
self.assertEqual(expected, call_warnings)
1688
def _startLogFile(self):
1689
"""Send bzr and test log messages to a temporary file.
1691
The file is removed as the test is torn down.
1693
pseudo_log_file = StringIO()
1694
def _get_log_contents_for_weird_testtools_api():
1695
return [pseudo_log_file.getvalue().decode(
1696
"utf-8", "replace").encode("utf-8")]
1697
self.addDetail("log", content.Content(content.ContentType("text",
1698
"plain", {"charset": "utf8"}),
1699
_get_log_contents_for_weird_testtools_api))
1700
self._log_file = pseudo_log_file
1701
self._log_memento = trace.push_log_file(self._log_file)
1702
self.addCleanup(self._finishLogFile)
1704
def _finishLogFile(self):
1705
"""Finished with the log file.
1707
Close the file and delete it, unless setKeepLogfile was called.
1709
if trace._trace_file:
1710
# flush the log file, to get all content
1711
trace._trace_file.flush()
1712
trace.pop_log_file(self._log_memento)
1714
def thisFailsStrictLockCheck(self):
1715
"""It is known that this test would fail with -Dstrict_locks.
1717
By default, all tests are run with strict lock checking unless
1718
-Edisable_lock_checks is supplied. However there are some tests which
1719
we know fail strict locks at this point that have not been fixed.
1720
They should call this function to disable the strict checking.
1722
This should be used sparingly, it is much better to fix the locking
1723
issues rather than papering over the problem by calling this function.
1725
debug.debug_flags.discard('strict_locks')
1727
def overrideAttr(self, obj, attr_name, new=_unitialized_attr):
1728
"""Overrides an object attribute restoring it after the test.
1730
:param obj: The object that will be mutated.
1732
:param attr_name: The attribute name we want to preserve/override in
1735
:param new: The optional value we want to set the attribute to.
1737
:returns: The actual attr value.
1739
value = getattr(obj, attr_name)
1740
# The actual value is captured by the call below
1741
self.addCleanup(setattr, obj, attr_name, value)
1742
if new is not _unitialized_attr:
1743
setattr(obj, attr_name, new)
1746
def overrideEnv(self, name, new):
1747
"""Set an environment variable, and reset it after the test.
1749
:param name: The environment variable name.
1751
:param new: The value to set the variable to. If None, the
1752
variable is deleted from the environment.
1754
:returns: The actual variable value.
1756
value = osutils.set_or_unset_env(name, new)
1757
self.addCleanup(osutils.set_or_unset_env, name, value)
1760
def _cleanEnvironment(self):
1761
for name, value in isolated_environ.iteritems():
1762
self.overrideEnv(name, value)
1764
def _restoreHooks(self):
1765
for klass, (name, hooks) in self._preserved_hooks.items():
1766
setattr(klass, name, hooks)
1767
self._preserved_hooks.clear()
1768
bzrlib.hooks._lazy_hooks = self._preserved_lazy_hooks
1769
self._preserved_lazy_hooks.clear()
1771
def knownFailure(self, reason):
1772
"""This test has failed for some known reason."""
1773
raise KnownFailure(reason)
1775
def _suppress_log(self):
1776
"""Remove the log info from details."""
1777
self.discardDetail('log')
1779
def _do_skip(self, result, reason):
1780
self._suppress_log()
1781
addSkip = getattr(result, 'addSkip', None)
1782
if not callable(addSkip):
1783
result.addSuccess(result)
1785
addSkip(self, reason)
1788
def _do_known_failure(self, result, e):
1789
self._suppress_log()
1790
err = sys.exc_info()
1791
addExpectedFailure = getattr(result, 'addExpectedFailure', None)
1792
if addExpectedFailure is not None:
1793
addExpectedFailure(self, err)
1795
result.addSuccess(self)
1798
def _do_not_applicable(self, result, e):
1800
reason = 'No reason given'
1803
self._suppress_log ()
1804
addNotApplicable = getattr(result, 'addNotApplicable', None)
1805
if addNotApplicable is not None:
1806
result.addNotApplicable(self, reason)
1808
self._do_skip(result, reason)
1811
def _report_skip(self, result, err):
1812
"""Override the default _report_skip.
1814
We want to strip the 'log' detail. If we waint until _do_skip, it has
1815
already been formatted into the 'reason' string, and we can't pull it
1818
self._suppress_log()
1819
super(TestCase, self)._report_skip(self, result, err)
1822
def _report_expected_failure(self, result, err):
1825
See _report_skip for motivation.
1827
self._suppress_log()
1828
super(TestCase, self)._report_expected_failure(self, result, err)
1831
def _do_unsupported_or_skip(self, result, e):
1833
self._suppress_log()
1834
addNotSupported = getattr(result, 'addNotSupported', None)
1835
if addNotSupported is not None:
1836
result.addNotSupported(self, reason)
1838
self._do_skip(result, reason)
1840
def time(self, callable, *args, **kwargs):
1841
"""Run callable and accrue the time it takes to the benchmark time.
1843
If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
1844
this will cause lsprofile statistics to be gathered and stored in
1847
if self._benchtime is None:
1848
self.addDetail('benchtime', content.Content(content.ContentType(
1849
"text", "plain"), lambda:[str(self._benchtime)]))
1853
if not self._gather_lsprof_in_benchmarks:
1854
return callable(*args, **kwargs)
1856
# record this benchmark
1857
ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
1859
self._benchcalls.append(((callable, args, kwargs), stats))
1862
self._benchtime += time.time() - start
1864
def log(self, *args):
1868
"""Get a unicode string containing the log from bzrlib.trace.
1870
Undecodable characters are replaced.
1872
return u"".join(self.getDetails()['log'].iter_text())
1874
def requireFeature(self, feature):
1875
"""This test requires a specific feature is available.
1877
:raises UnavailableFeature: When feature is not available.
1879
if not feature.available():
1880
raise UnavailableFeature(feature)
1882
def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
1884
"""Run bazaar command line, splitting up a string command line."""
1885
if isinstance(args, basestring):
1886
# shlex don't understand unicode strings,
1887
# so args should be plain string (bialix 20070906)
1888
args = list(shlex.split(str(args)))
1889
return self._run_bzr_core(args, retcode=retcode,
1890
encoding=encoding, stdin=stdin, working_dir=working_dir,
1893
def _run_bzr_core(self, args, retcode, encoding, stdin,
1895
# Clear chk_map page cache, because the contents are likely to mask
1897
chk_map.clear_cache()
1898
if encoding is None:
1899
encoding = osutils.get_user_encoding()
1900
stdout = StringIOWrapper()
1901
stderr = StringIOWrapper()
1902
stdout.encoding = encoding
1903
stderr.encoding = encoding
1905
self.log('run bzr: %r', args)
1906
# FIXME: don't call into logging here
1907
handler = logging.StreamHandler(stderr)
1908
handler.setLevel(logging.INFO)
1909
logger = logging.getLogger('')
1910
logger.addHandler(handler)
1911
old_ui_factory = ui.ui_factory
1912
ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
1915
if working_dir is not None:
1916
cwd = osutils.getcwd()
1917
os.chdir(working_dir)
1921
result = self.apply_redirected(
1922
ui.ui_factory.stdin,
1924
_mod_commands.run_bzr_catch_user_errors,
1926
except KeyboardInterrupt:
1927
# Reraise KeyboardInterrupt with contents of redirected stdout
1928
# and stderr as arguments, for tests which are interested in
1929
# stdout and stderr and are expecting the exception.
1930
out = stdout.getvalue()
1931
err = stderr.getvalue()
1933
self.log('output:\n%r', out)
1935
self.log('errors:\n%r', err)
1936
raise KeyboardInterrupt(out, err)
1938
logger.removeHandler(handler)
1939
ui.ui_factory = old_ui_factory
1943
out = stdout.getvalue()
1944
err = stderr.getvalue()
1946
self.log('output:\n%r', out)
1948
self.log('errors:\n%r', err)
1949
if retcode is not None:
1950
self.assertEquals(retcode, result,
1951
message='Unexpected return code')
1952
return result, out, err
1954
def run_bzr(self, args, retcode=0, encoding=None, stdin=None,
1955
working_dir=None, error_regexes=[], output_encoding=None):
1956
"""Invoke bzr, as if it were run from the command line.
1958
The argument list should not include the bzr program name - the
1959
first argument is normally the bzr command. Arguments may be
1960
passed in three ways:
1962
1- A list of strings, eg ["commit", "a"]. This is recommended
1963
when the command contains whitespace or metacharacters, or
1964
is built up at run time.
1966
2- A single string, eg "add a". This is the most convenient
1967
for hardcoded commands.
1969
This runs bzr through the interface that catches and reports
1970
errors, and with logging set to something approximating the
1971
default, so that error reporting can be checked.
1973
This should be the main method for tests that want to exercise the
1974
overall behavior of the bzr application (rather than a unit test
1975
or a functional test of the library.)
1977
This sends the stdout/stderr results into the test's log,
1978
where it may be useful for debugging. See also run_captured.
1980
:keyword stdin: A string to be used as stdin for the command.
1981
:keyword retcode: The status code the command should return;
1983
:keyword working_dir: The directory to run the command in
1984
:keyword error_regexes: A list of expected error messages. If
1985
specified they must be seen in the error output of the command.
1987
retcode, out, err = self._run_bzr_autosplit(
1992
working_dir=working_dir,
1994
self.assertIsInstance(error_regexes, (list, tuple))
1995
for regex in error_regexes:
1996
self.assertContainsRe(err, regex)
1999
def run_bzr_error(self, error_regexes, *args, **kwargs):
2000
"""Run bzr, and check that stderr contains the supplied regexes
2002
:param error_regexes: Sequence of regular expressions which
2003
must each be found in the error output. The relative ordering
2005
:param args: command-line arguments for bzr
2006
:param kwargs: Keyword arguments which are interpreted by run_bzr
2007
This function changes the default value of retcode to be 3,
2008
since in most cases this is run when you expect bzr to fail.
2010
:return: (out, err) The actual output of running the command (in case
2011
you want to do more inspection)
2015
# Make sure that commit is failing because there is nothing to do
2016
self.run_bzr_error(['no changes to commit'],
2017
['commit', '-m', 'my commit comment'])
2018
# Make sure --strict is handling an unknown file, rather than
2019
# giving us the 'nothing to do' error
2020
self.build_tree(['unknown'])
2021
self.run_bzr_error(['Commit refused because there are unknown files'],
2022
['commit', --strict', '-m', 'my commit comment'])
2024
kwargs.setdefault('retcode', 3)
2025
kwargs['error_regexes'] = error_regexes
2026
out, err = self.run_bzr(*args, **kwargs)
2029
def run_bzr_subprocess(self, *args, **kwargs):
2030
"""Run bzr in a subprocess for testing.
2032
This starts a new Python interpreter and runs bzr in there.
2033
This should only be used for tests that have a justifiable need for
2034
this isolation: e.g. they are testing startup time, or signal
2035
handling, or early startup code, etc. Subprocess code can't be
2036
profiled or debugged so easily.
2038
:keyword retcode: The status code that is expected. Defaults to 0. If
2039
None is supplied, the status code is not checked.
2040
:keyword env_changes: A dictionary which lists changes to environment
2041
variables. A value of None will unset the env variable.
2042
The values must be strings. The change will only occur in the
2043
child, so you don't need to fix the environment after running.
2044
:keyword universal_newlines: Convert CRLF => LF
2045
:keyword allow_plugins: By default the subprocess is run with
2046
--no-plugins to ensure test reproducibility. Also, it is possible
2047
for system-wide plugins to create unexpected output on stderr,
2048
which can cause unnecessary test failures.
2050
env_changes = kwargs.get('env_changes', {})
2051
working_dir = kwargs.get('working_dir', None)
2052
allow_plugins = kwargs.get('allow_plugins', False)
2054
if isinstance(args[0], list):
2056
elif isinstance(args[0], basestring):
2057
args = list(shlex.split(args[0]))
2059
raise ValueError("passing varargs to run_bzr_subprocess")
2060
process = self.start_bzr_subprocess(args, env_changes=env_changes,
2061
working_dir=working_dir,
2062
allow_plugins=allow_plugins)
2063
# We distinguish between retcode=None and retcode not passed.
2064
supplied_retcode = kwargs.get('retcode', 0)
2065
return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
2066
universal_newlines=kwargs.get('universal_newlines', False),
2069
def start_bzr_subprocess(self, process_args, env_changes=None,
2070
skip_if_plan_to_signal=False,
2072
allow_plugins=False, stderr=subprocess.PIPE):
2073
"""Start bzr in a subprocess for testing.
2075
This starts a new Python interpreter and runs bzr in there.
2076
This should only be used for tests that have a justifiable need for
2077
this isolation: e.g. they are testing startup time, or signal
2078
handling, or early startup code, etc. Subprocess code can't be
2079
profiled or debugged so easily.
2081
:param process_args: a list of arguments to pass to the bzr executable,
2082
for example ``['--version']``.
2083
:param env_changes: A dictionary which lists changes to environment
2084
variables. A value of None will unset the env variable.
2085
The values must be strings. The change will only occur in the
2086
child, so you don't need to fix the environment after running.
2087
:param skip_if_plan_to_signal: raise TestSkipped when true and system
2088
doesn't support signalling subprocesses.
2089
:param allow_plugins: If False (default) pass --no-plugins to bzr.
2090
:param stderr: file to use for the subprocess's stderr. Valid values
2091
are those valid for the stderr argument of `subprocess.Popen`.
2092
Default value is ``subprocess.PIPE``.
2094
:returns: Popen object for the started process.
2096
if skip_if_plan_to_signal:
2097
if os.name != "posix":
2098
raise TestSkipped("Sending signals not supported")
2100
if env_changes is None:
2104
def cleanup_environment():
2105
for env_var, value in env_changes.iteritems():
2106
old_env[env_var] = osutils.set_or_unset_env(env_var, value)
2108
def restore_environment():
2109
for env_var, value in old_env.iteritems():
2110
osutils.set_or_unset_env(env_var, value)
2112
bzr_path = self.get_bzr_path()
2115
if working_dir is not None:
2116
cwd = osutils.getcwd()
2117
os.chdir(working_dir)
2120
# win32 subprocess doesn't support preexec_fn
2121
# so we will avoid using it on all platforms, just to
2122
# make sure the code path is used, and we don't break on win32
2123
cleanup_environment()
2124
# Include the subprocess's log file in the test details, in case
2125
# the test fails due to an error in the subprocess.
2126
self._add_subprocess_log(trace._get_bzr_log_filename())
2127
command = [sys.executable]
2128
# frozen executables don't need the path to bzr
2129
if getattr(sys, "frozen", None) is None:
2130
command.append(bzr_path)
2131
if not allow_plugins:
2132
command.append('--no-plugins')
2133
command.extend(process_args)
2134
process = self._popen(command, stdin=subprocess.PIPE,
2135
stdout=subprocess.PIPE,
2138
restore_environment()
2144
def _add_subprocess_log(self, log_file_path):
2145
if len(self._log_files) == 0:
2146
# Register an addCleanup func. We do this on the first call to
2147
# _add_subprocess_log rather than in TestCase.setUp so that this
2148
# addCleanup is registered after any cleanups for tempdirs that
2149
# subclasses might create, which will probably remove the log file
2151
self.addCleanup(self._subprocess_log_cleanup)
2152
# self._log_files is a set, so if a log file is reused we won't grab it
2154
self._log_files.add(log_file_path)
2156
def _subprocess_log_cleanup(self):
2157
for count, log_file_path in enumerate(self._log_files):
2158
# We use buffer_now=True to avoid holding the file open beyond
2159
# the life of this function, which might interfere with e.g.
2160
# cleaning tempdirs on Windows.
2161
# XXX: Testtools 0.9.5 doesn't have the content_from_file helper
2162
#detail_content = content.content_from_file(
2163
# log_file_path, buffer_now=True)
2164
with open(log_file_path, 'rb') as log_file:
2165
log_file_bytes = log_file.read()
2166
detail_content = content.Content(content.ContentType("text",
2167
"plain", {"charset": "utf8"}), lambda: [log_file_bytes])
2168
self.addDetail("start_bzr_subprocess-log-%d" % (count,),
2171
def _popen(self, *args, **kwargs):
2172
"""Place a call to Popen.
2174
Allows tests to override this method to intercept the calls made to
2175
Popen for introspection.
2177
return subprocess.Popen(*args, **kwargs)
2179
def get_source_path(self):
2180
"""Return the path of the directory containing bzrlib."""
2181
return os.path.dirname(os.path.dirname(bzrlib.__file__))
2183
def get_bzr_path(self):
2184
"""Return the path of the 'bzr' executable for this test suite."""
2185
bzr_path = os.path.join(self.get_source_path(), "bzr")
2186
if not os.path.isfile(bzr_path):
2187
# We are probably installed. Assume sys.argv is the right file
2188
bzr_path = sys.argv[0]
2191
def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
2192
universal_newlines=False, process_args=None):
2193
"""Finish the execution of process.
2195
:param process: the Popen object returned from start_bzr_subprocess.
2196
:param retcode: The status code that is expected. Defaults to 0. If
2197
None is supplied, the status code is not checked.
2198
:param send_signal: an optional signal to send to the process.
2199
:param universal_newlines: Convert CRLF => LF
2200
:returns: (stdout, stderr)
2202
if send_signal is not None:
2203
os.kill(process.pid, send_signal)
2204
out, err = process.communicate()
2206
if universal_newlines:
2207
out = out.replace('\r\n', '\n')
2208
err = err.replace('\r\n', '\n')
2210
if retcode is not None and retcode != process.returncode:
2211
if process_args is None:
2212
process_args = "(unknown args)"
2213
trace.mutter('Output of bzr %s:\n%s', process_args, out)
2214
trace.mutter('Error for bzr %s:\n%s', process_args, err)
2215
self.fail('Command bzr %s failed with retcode %s != %s'
2216
% (process_args, retcode, process.returncode))
2219
def check_tree_shape(self, tree, shape):
2220
"""Compare a tree to a list of expected names.
2222
Fail if they are not precisely equal.
2225
shape = list(shape) # copy
2226
for path, ie in tree.iter_entries_by_dir():
2227
name = path.replace('\\', '/')
2228
if ie.kind == 'directory':
2231
pass # ignore root entry
2237
self.fail("expected paths not found in inventory: %r" % shape)
2239
self.fail("unexpected paths found in inventory: %r" % extras)
2241
def apply_redirected(self, stdin=None, stdout=None, stderr=None,
2242
a_callable=None, *args, **kwargs):
2243
"""Call callable with redirected std io pipes.
2245
Returns the return code."""
2246
if not callable(a_callable):
2247
raise ValueError("a_callable must be callable.")
2249
stdin = StringIO("")
2251
if getattr(self, "_log_file", None) is not None:
2252
stdout = self._log_file
2256
if getattr(self, "_log_file", None is not None):
2257
stderr = self._log_file
2260
real_stdin = sys.stdin
2261
real_stdout = sys.stdout
2262
real_stderr = sys.stderr
2267
return a_callable(*args, **kwargs)
2269
sys.stdout = real_stdout
2270
sys.stderr = real_stderr
2271
sys.stdin = real_stdin
2273
def reduceLockdirTimeout(self):
2274
"""Reduce the default lock timeout for the duration of the test, so that
2275
if LockContention occurs during a test, it does so quickly.
2277
Tests that expect to provoke LockContention errors should call this.
2279
self.overrideAttr(lockdir, '_DEFAULT_TIMEOUT_SECONDS', 0)
2281
def make_utf8_encoded_stringio(self, encoding_type=None):
2282
"""Return a StringIOWrapper instance, that will encode Unicode
2285
if encoding_type is None:
2286
encoding_type = 'strict'
2288
output_encoding = 'utf-8'
2289
sio = codecs.getwriter(output_encoding)(sio, errors=encoding_type)
2290
sio.encoding = output_encoding
2293
def disable_verb(self, verb):
2294
"""Disable a smart server verb for one test."""
2295
from bzrlib.smart import request
2296
request_handlers = request.request_handlers
2297
orig_method = request_handlers.get(verb)
2298
request_handlers.remove(verb)
2299
self.addCleanup(request_handlers.register, verb, orig_method)
2302
class CapturedCall(object):
2303
"""A helper for capturing smart server calls for easy debug analysis."""
2305
def __init__(self, params, prefix_length):
2306
"""Capture the call with params and skip prefix_length stack frames."""
2309
# The last 5 frames are the __init__, the hook frame, and 3 smart
2310
# client frames. Beyond this we could get more clever, but this is good
2312
stack = traceback.extract_stack()[prefix_length:-5]
2313
self.stack = ''.join(traceback.format_list(stack))
2316
return self.call.method
2319
return self.call.method
2325
class TestCaseWithMemoryTransport(TestCase):
2326
"""Common test class for tests that do not need disk resources.
2328
Tests that need disk resources should derive from TestCaseWithTransport.
2330
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
2332
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
2333
a directory which does not exist. This serves to help ensure test isolation
2334
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
2335
must exist. However, TestCaseWithMemoryTransport does not offer local
2336
file defaults for the transport in tests, nor does it obey the command line
2337
override, so tests that accidentally write to the common directory should
2340
:cvar TEST_ROOT: Directory containing all temporary directories, plus
2341
a .bzr directory that stops us ascending higher into the filesystem.
2347
def __init__(self, methodName='runTest'):
2348
# allow test parameterization after test construction and before test
2349
# execution. Variables that the parameterizer sets need to be
2350
# ones that are not set by setUp, or setUp will trash them.
2351
super(TestCaseWithMemoryTransport, self).__init__(methodName)
2352
self.vfs_transport_factory = default_transport
2353
self.transport_server = None
2354
self.transport_readonly_server = None
2355
self.__vfs_server = None
2357
def get_transport(self, relpath=None):
2358
"""Return a writeable transport.
2360
This transport is for the test scratch space relative to
2363
:param relpath: a path relative to the base url.
2365
t = _mod_transport.get_transport(self.get_url(relpath))
2366
self.assertFalse(t.is_readonly())
2369
def get_readonly_transport(self, relpath=None):
2370
"""Return a readonly transport for the test scratch space
2372
This can be used to test that operations which should only need
2373
readonly access in fact do not try to write.
2375
:param relpath: a path relative to the base url.
2377
t = _mod_transport.get_transport(self.get_readonly_url(relpath))
2378
self.assertTrue(t.is_readonly())
2381
def create_transport_readonly_server(self):
2382
"""Create a transport server from class defined at init.
2384
This is mostly a hook for daughter classes.
2386
return self.transport_readonly_server()
2388
def get_readonly_server(self):
2389
"""Get the server instance for the readonly transport
2391
This is useful for some tests with specific servers to do diagnostics.
2393
if self.__readonly_server is None:
2394
if self.transport_readonly_server is None:
2395
# readonly decorator requested
2396
self.__readonly_server = test_server.ReadonlyServer()
2398
# explicit readonly transport.
2399
self.__readonly_server = self.create_transport_readonly_server()
2400
self.start_server(self.__readonly_server,
2401
self.get_vfs_only_server())
2402
return self.__readonly_server
2404
def get_readonly_url(self, relpath=None):
2405
"""Get a URL for the readonly transport.
2407
This will either be backed by '.' or a decorator to the transport
2408
used by self.get_url()
2409
relpath provides for clients to get a path relative to the base url.
2410
These should only be downwards relative, not upwards.
2412
base = self.get_readonly_server().get_url()
2413
return self._adjust_url(base, relpath)
2415
def get_vfs_only_server(self):
2416
"""Get the vfs only read/write server instance.
2418
This is useful for some tests with specific servers that need
2421
For TestCaseWithMemoryTransport this is always a MemoryServer, and there
2422
is no means to override it.
2424
if self.__vfs_server is None:
2425
self.__vfs_server = memory.MemoryServer()
2426
self.start_server(self.__vfs_server)
2427
return self.__vfs_server
2429
def get_server(self):
2430
"""Get the read/write server instance.
2432
This is useful for some tests with specific servers that need
2435
This is built from the self.transport_server factory. If that is None,
2436
then the self.get_vfs_server is returned.
2438
if self.__server is None:
2439
if (self.transport_server is None or self.transport_server is
2440
self.vfs_transport_factory):
2441
self.__server = self.get_vfs_only_server()
2443
# bring up a decorated means of access to the vfs only server.
2444
self.__server = self.transport_server()
2445
self.start_server(self.__server, self.get_vfs_only_server())
2446
return self.__server
2448
def _adjust_url(self, base, relpath):
2449
"""Get a URL (or maybe a path) for the readwrite transport.
2451
This will either be backed by '.' or to an equivalent non-file based
2453
relpath provides for clients to get a path relative to the base url.
2454
These should only be downwards relative, not upwards.
2456
if relpath is not None and relpath != '.':
2457
if not base.endswith('/'):
2459
# XXX: Really base should be a url; we did after all call
2460
# get_url()! But sometimes it's just a path (from
2461
# LocalAbspathServer), and it'd be wrong to append urlescaped data
2462
# to a non-escaped local path.
2463
if base.startswith('./') or base.startswith('/'):
2466
base += urlutils.escape(relpath)
2469
def get_url(self, relpath=None):
2470
"""Get a URL (or maybe a path) for the readwrite transport.
2472
This will either be backed by '.' or to an equivalent non-file based
2474
relpath provides for clients to get a path relative to the base url.
2475
These should only be downwards relative, not upwards.
2477
base = self.get_server().get_url()
2478
return self._adjust_url(base, relpath)
2480
def get_vfs_only_url(self, relpath=None):
2481
"""Get a URL (or maybe a path for the plain old vfs transport.
2483
This will never be a smart protocol. It always has all the
2484
capabilities of the local filesystem, but it might actually be a
2485
MemoryTransport or some other similar virtual filesystem.
2487
This is the backing transport (if any) of the server returned by
2488
get_url and get_readonly_url.
2490
:param relpath: provides for clients to get a path relative to the base
2491
url. These should only be downwards relative, not upwards.
2494
base = self.get_vfs_only_server().get_url()
2495
return self._adjust_url(base, relpath)
2497
def _create_safety_net(self):
2498
"""Make a fake bzr directory.
2500
This prevents any tests propagating up onto the TEST_ROOT directory's
2503
root = TestCaseWithMemoryTransport.TEST_ROOT
2504
bzrdir.BzrDir.create_standalone_workingtree(root)
2506
def _check_safety_net(self):
2507
"""Check that the safety .bzr directory have not been touched.
2509
_make_test_root have created a .bzr directory to prevent tests from
2510
propagating. This method ensures than a test did not leaked.
2512
root = TestCaseWithMemoryTransport.TEST_ROOT
2513
self.permit_url(_mod_transport.get_transport(root).base)
2514
wt = workingtree.WorkingTree.open(root)
2515
last_rev = wt.last_revision()
2516
if last_rev != 'null:':
2517
# The current test have modified the /bzr directory, we need to
2518
# recreate a new one or all the followng tests will fail.
2519
# If you need to inspect its content uncomment the following line
2520
# import pdb; pdb.set_trace()
2521
_rmtree_temp_dir(root + '/.bzr', test_id=self.id())
2522
self._create_safety_net()
2523
raise AssertionError('%s/.bzr should not be modified' % root)
2525
def _make_test_root(self):
2526
if TestCaseWithMemoryTransport.TEST_ROOT is None:
2527
# Watch out for tricky test dir (on OSX /tmp -> /private/tmp)
2528
root = osutils.realpath(osutils.mkdtemp(prefix='testbzr-',
2530
TestCaseWithMemoryTransport.TEST_ROOT = root
2532
self._create_safety_net()
2534
# The same directory is used by all tests, and we're not
2535
# specifically told when all tests are finished. This will do.
2536
atexit.register(_rmtree_temp_dir, root)
2538
self.permit_dir(TestCaseWithMemoryTransport.TEST_ROOT)
2539
self.addCleanup(self._check_safety_net)
2541
def makeAndChdirToTestDir(self):
2542
"""Create a temporary directories for this one test.
2544
This must set self.test_home_dir and self.test_dir and chdir to
2547
For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
2549
os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
2550
self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
2551
self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
2552
self.permit_dir(self.test_dir)
2554
def make_branch(self, relpath, format=None):
2555
"""Create a branch on the transport at relpath."""
2556
repo = self.make_repository(relpath, format=format)
2557
return repo.bzrdir.create_branch()
2559
def make_bzrdir(self, relpath, format=None):
2561
# might be a relative or absolute path
2562
maybe_a_url = self.get_url(relpath)
2563
segments = maybe_a_url.rsplit('/', 1)
2564
t = _mod_transport.get_transport(maybe_a_url)
2565
if len(segments) > 1 and segments[-1] not in ('', '.'):
2569
if isinstance(format, basestring):
2570
format = bzrdir.format_registry.make_bzrdir(format)
2571
return format.initialize_on_transport(t)
2572
except errors.UninitializableFormat:
2573
raise TestSkipped("Format %s is not initializable." % format)
2575
def make_repository(self, relpath, shared=False, format=None):
2576
"""Create a repository on our default transport at relpath.
2578
Note that relpath must be a relative path, not a full url.
2580
# FIXME: If you create a remoterepository this returns the underlying
2581
# real format, which is incorrect. Actually we should make sure that
2582
# RemoteBzrDir returns a RemoteRepository.
2583
# maybe mbp 20070410
2584
made_control = self.make_bzrdir(relpath, format=format)
2585
return made_control.create_repository(shared=shared)
2587
def make_smart_server(self, path, backing_server=None):
2588
if backing_server is None:
2589
backing_server = self.get_server()
2590
smart_server = test_server.SmartTCPServer_for_testing()
2591
self.start_server(smart_server, backing_server)
2592
remote_transport = _mod_transport.get_transport(smart_server.get_url()
2594
return remote_transport
2596
def make_branch_and_memory_tree(self, relpath, format=None):
2597
"""Create a branch on the default transport and a MemoryTree for it."""
2598
b = self.make_branch(relpath, format=format)
2599
return memorytree.MemoryTree.create_on_branch(b)
2601
def make_branch_builder(self, relpath, format=None):
2602
branch = self.make_branch(relpath, format=format)
2603
return branchbuilder.BranchBuilder(branch=branch)
2605
def overrideEnvironmentForTesting(self):
2606
test_home_dir = self.test_home_dir
2607
if isinstance(test_home_dir, unicode):
2608
test_home_dir = test_home_dir.encode(sys.getfilesystemencoding())
2609
self.overrideEnv('HOME', test_home_dir)
2610
self.overrideEnv('BZR_HOME', test_home_dir)
2613
super(TestCaseWithMemoryTransport, self).setUp()
2614
# Ensure that ConnectedTransport doesn't leak sockets
2615
def get_transport_with_cleanup(*args, **kwargs):
2616
t = orig_get_transport(*args, **kwargs)
2617
if isinstance(t, _mod_transport.ConnectedTransport):
2618
self.addCleanup(t.disconnect)
2621
orig_get_transport = self.overrideAttr(_mod_transport, 'get_transport',
2622
get_transport_with_cleanup)
2623
self._make_test_root()
2624
self.addCleanup(os.chdir, os.getcwdu())
2625
self.makeAndChdirToTestDir()
2626
self.overrideEnvironmentForTesting()
2627
self.__readonly_server = None
2628
self.__server = None
2629
self.reduceLockdirTimeout()
2631
def setup_smart_server_with_call_log(self):
2632
"""Sets up a smart server as the transport server with a call log."""
2633
self.transport_server = test_server.SmartTCPServer_for_testing
2634
self.hpss_calls = []
2636
# Skip the current stack down to the caller of
2637
# setup_smart_server_with_call_log
2638
prefix_length = len(traceback.extract_stack()) - 2
2639
def capture_hpss_call(params):
2640
self.hpss_calls.append(
2641
CapturedCall(params, prefix_length))
2642
client._SmartClient.hooks.install_named_hook(
2643
'call', capture_hpss_call, None)
2645
def reset_smart_call_log(self):
2646
self.hpss_calls = []
2649
class TestCaseInTempDir(TestCaseWithMemoryTransport):
2650
"""Derived class that runs a test within a temporary directory.
2652
This is useful for tests that need to create a branch, etc.
2654
The directory is created in a slightly complex way: for each
2655
Python invocation, a new temporary top-level directory is created.
2656
All test cases create their own directory within that. If the
2657
tests complete successfully, the directory is removed.
2659
:ivar test_base_dir: The path of the top-level directory for this
2660
test, which contains a home directory and a work directory.
2662
:ivar test_home_dir: An initially empty directory under test_base_dir
2663
which is used as $HOME for this test.
2665
:ivar test_dir: A directory under test_base_dir used as the current
2666
directory when the test proper is run.
2669
OVERRIDE_PYTHON = 'python'
2671
def check_file_contents(self, filename, expect):
2672
self.log("check contents of file %s" % filename)
2678
if contents != expect:
2679
self.log("expected: %r" % expect)
2680
self.log("actually: %r" % contents)
2681
self.fail("contents of %s not as expected" % filename)
2683
def _getTestDirPrefix(self):
2684
# create a directory within the top level test directory
2685
if sys.platform in ('win32', 'cygwin'):
2686
name_prefix = re.sub('[<>*=+",:;_/\\-]', '_', self.id())
2687
# windows is likely to have path-length limits so use a short name
2688
name_prefix = name_prefix[-30:]
2690
name_prefix = re.sub('[/]', '_', self.id())
2693
def makeAndChdirToTestDir(self):
2694
"""See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
2696
For TestCaseInTempDir we create a temporary directory based on the test
2697
name and then create two subdirs - test and home under it.
2699
name_prefix = osutils.pathjoin(TestCaseWithMemoryTransport.TEST_ROOT,
2700
self._getTestDirPrefix())
2702
for i in range(100):
2703
if os.path.exists(name):
2704
name = name_prefix + '_' + str(i)
2706
# now create test and home directories within this dir
2707
self.test_base_dir = name
2708
self.addCleanup(self.deleteTestDir)
2709
os.mkdir(self.test_base_dir)
2711
self.permit_dir(self.test_base_dir)
2712
# 'sprouting' and 'init' of a branch both walk up the tree to find
2713
# stacking policy to honour; create a bzr dir with an unshared
2714
# repository (but not a branch - our code would be trying to escape
2715
# then!) to stop them, and permit it to be read.
2716
# control = bzrdir.BzrDir.create(self.test_base_dir)
2717
# control.create_repository()
2718
self.test_home_dir = self.test_base_dir + '/home'
2719
os.mkdir(self.test_home_dir)
2720
self.test_dir = self.test_base_dir + '/work'
2721
os.mkdir(self.test_dir)
2722
os.chdir(self.test_dir)
2723
# put name of test inside
2724
f = file(self.test_base_dir + '/name', 'w')
2730
def deleteTestDir(self):
2731
os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
2732
_rmtree_temp_dir(self.test_base_dir, test_id=self.id())
2734
def build_tree(self, shape, line_endings='binary', transport=None):
2735
"""Build a test tree according to a pattern.
2737
shape is a sequence of file specifications. If the final
2738
character is '/', a directory is created.
2740
This assumes that all the elements in the tree being built are new.
2742
This doesn't add anything to a branch.
2744
:type shape: list or tuple.
2745
:param line_endings: Either 'binary' or 'native'
2746
in binary mode, exact contents are written in native mode, the
2747
line endings match the default platform endings.
2748
:param transport: A transport to write to, for building trees on VFS's.
2749
If the transport is readonly or None, "." is opened automatically.
2752
if type(shape) not in (list, tuple):
2753
raise AssertionError("Parameter 'shape' should be "
2754
"a list or a tuple. Got %r instead" % (shape,))
2755
# It's OK to just create them using forward slashes on windows.
2756
if transport is None or transport.is_readonly():
2757
transport = _mod_transport.get_transport(".")
2759
self.assertIsInstance(name, basestring)
2761
transport.mkdir(urlutils.escape(name[:-1]))
2763
if line_endings == 'binary':
2765
elif line_endings == 'native':
2768
raise errors.BzrError(
2769
'Invalid line ending request %r' % line_endings)
2770
content = "contents of %s%s" % (name.encode('utf-8'), end)
2771
transport.put_bytes_non_atomic(urlutils.escape(name), content)
2773
build_tree_contents = staticmethod(treeshape.build_tree_contents)
2775
def assertInWorkingTree(self, path, root_path='.', tree=None):
2776
"""Assert whether path or paths are in the WorkingTree"""
2778
tree = workingtree.WorkingTree.open(root_path)
2779
if not isinstance(path, basestring):
2781
self.assertInWorkingTree(p, tree=tree)
2783
self.assertIsNot(tree.path2id(path), None,
2784
path+' not in working tree.')
2786
def assertNotInWorkingTree(self, path, root_path='.', tree=None):
2787
"""Assert whether path or paths are not in the WorkingTree"""
2789
tree = workingtree.WorkingTree.open(root_path)
2790
if not isinstance(path, basestring):
2792
self.assertNotInWorkingTree(p,tree=tree)
2794
self.assertIs(tree.path2id(path), None, path+' in working tree.')
2797
class TestCaseWithTransport(TestCaseInTempDir):
2798
"""A test case that provides get_url and get_readonly_url facilities.
2800
These back onto two transport servers, one for readonly access and one for
2803
If no explicit class is provided for readonly access, a
2804
ReadonlyTransportDecorator is used instead which allows the use of non disk
2805
based read write transports.
2807
If an explicit class is provided for readonly access, that server and the
2808
readwrite one must both define get_url() as resolving to os.getcwd().
2811
def get_vfs_only_server(self):
2812
"""See TestCaseWithMemoryTransport.
2814
This is useful for some tests with specific servers that need
2817
if self.__vfs_server is None:
2818
self.__vfs_server = self.vfs_transport_factory()
2819
self.start_server(self.__vfs_server)
2820
return self.__vfs_server
2822
def make_branch_and_tree(self, relpath, format=None):
2823
"""Create a branch on the transport and a tree locally.
2825
If the transport is not a LocalTransport, the Tree can't be created on
2826
the transport. In that case if the vfs_transport_factory is
2827
LocalURLServer the working tree is created in the local
2828
directory backing the transport, and the returned tree's branch and
2829
repository will also be accessed locally. Otherwise a lightweight
2830
checkout is created and returned.
2832
We do this because we can't physically create a tree in the local
2833
path, with a branch reference to the transport_factory url, and
2834
a branch + repository in the vfs_transport, unless the vfs_transport
2835
namespace is distinct from the local disk - the two branch objects
2836
would collide. While we could construct a tree with its branch object
2837
pointing at the transport_factory transport in memory, reopening it
2838
would behaving unexpectedly, and has in the past caused testing bugs
2839
when we tried to do it that way.
2841
:param format: The BzrDirFormat.
2842
:returns: the WorkingTree.
2844
# TODO: always use the local disk path for the working tree,
2845
# this obviously requires a format that supports branch references
2846
# so check for that by checking bzrdir.BzrDirFormat.get_default_format()
2848
b = self.make_branch(relpath, format=format)
2850
return b.bzrdir.create_workingtree()
2851
except errors.NotLocalUrl:
2852
# We can only make working trees locally at the moment. If the
2853
# transport can't support them, then we keep the non-disk-backed
2854
# branch and create a local checkout.
2855
if self.vfs_transport_factory is test_server.LocalURLServer:
2856
# the branch is colocated on disk, we cannot create a checkout.
2857
# hopefully callers will expect this.
2858
local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
2859
wt = local_controldir.create_workingtree()
2860
if wt.branch._format != b._format:
2862
# Make sure that assigning to wt._branch fixes wt.branch,
2863
# in case the implementation details of workingtree objects
2865
self.assertIs(b, wt.branch)
2868
return b.create_checkout(relpath, lightweight=True)
2870
def assertIsDirectory(self, relpath, transport):
2871
"""Assert that relpath within transport is a directory.
2873
This may not be possible on all transports; in that case it propagates
2874
a TransportNotPossible.
2877
mode = transport.stat(relpath).st_mode
2878
except errors.NoSuchFile:
2879
self.fail("path %s is not a directory; no such file"
2881
if not stat.S_ISDIR(mode):
2882
self.fail("path %s is not a directory; has mode %#o"
2885
def assertTreesEqual(self, left, right):
2886
"""Check that left and right have the same content and properties."""
2887
# we use a tree delta to check for equality of the content, and we
2888
# manually check for equality of other things such as the parents list.
2889
self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
2890
differences = left.changes_from(right)
2891
self.assertFalse(differences.has_changed(),
2892
"Trees %r and %r are different: %r" % (left, right, differences))
2895
super(TestCaseWithTransport, self).setUp()
2896
self.__vfs_server = None
2898
def disable_missing_extensions_warning(self):
2899
"""Some tests expect a precise stderr content.
2901
There is no point in forcing them to duplicate the extension related
2904
config.GlobalConfig().set_user_option('ignore_missing_extensions', True)
2907
class ChrootedTestCase(TestCaseWithTransport):
2908
"""A support class that provides readonly urls outside the local namespace.
2910
This is done by checking if self.transport_server is a MemoryServer. if it
2911
is then we are chrooted already, if it is not then an HttpServer is used
2914
TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
2915
be used without needed to redo it when a different
2916
subclass is in use ?
2920
from bzrlib.tests import http_server
2921
super(ChrootedTestCase, self).setUp()
2922
if not self.vfs_transport_factory == memory.MemoryServer:
2923
self.transport_readonly_server = http_server.HttpServer
2926
def condition_id_re(pattern):
2927
"""Create a condition filter which performs a re check on a test's id.
2929
:param pattern: A regular expression string.
2930
:return: A callable that returns True if the re matches.
2932
filter_re = re.compile(pattern, 0)
2933
def condition(test):
2935
return filter_re.search(test_id)
2939
def condition_isinstance(klass_or_klass_list):
2940
"""Create a condition filter which returns isinstance(param, klass).
2942
:return: A callable which when called with one parameter obj return the
2943
result of isinstance(obj, klass_or_klass_list).
2946
return isinstance(obj, klass_or_klass_list)
2950
def condition_id_in_list(id_list):
2951
"""Create a condition filter which verify that test's id in a list.
2953
:param id_list: A TestIdList object.
2954
:return: A callable that returns True if the test's id appears in the list.
2956
def condition(test):
2957
return id_list.includes(test.id())
2961
def condition_id_startswith(starts):
2962
"""Create a condition filter verifying that test's id starts with a string.
2964
:param starts: A list of string.
2965
:return: A callable that returns True if the test's id starts with one of
2968
def condition(test):
2969
for start in starts:
2970
if test.id().startswith(start):
2976
def exclude_tests_by_condition(suite, condition):
2977
"""Create a test suite which excludes some tests from suite.
2979
:param suite: The suite to get tests from.
2980
:param condition: A callable whose result evaluates True when called with a
2981
test case which should be excluded from the result.
2982
:return: A suite which contains the tests found in suite that fail
2986
for test in iter_suite_tests(suite):
2987
if not condition(test):
2989
return TestUtil.TestSuite(result)
2992
def filter_suite_by_condition(suite, condition):
2993
"""Create a test suite by filtering another one.
2995
:param suite: The source suite.
2996
:param condition: A callable whose result evaluates True when called with a
2997
test case which should be included in the result.
2998
:return: A suite which contains the tests found in suite that pass
3002
for test in iter_suite_tests(suite):
3005
return TestUtil.TestSuite(result)
3008
def filter_suite_by_re(suite, pattern):
3009
"""Create a test suite by filtering another one.
3011
:param suite: the source suite
3012
:param pattern: pattern that names must match
3013
:returns: the newly created suite
3015
condition = condition_id_re(pattern)
3016
result_suite = filter_suite_by_condition(suite, condition)
3020
def filter_suite_by_id_list(suite, test_id_list):
3021
"""Create a test suite by filtering another one.
3023
:param suite: The source suite.
3024
:param test_id_list: A list of the test ids to keep as strings.
3025
:returns: the newly created suite
3027
condition = condition_id_in_list(test_id_list)
3028
result_suite = filter_suite_by_condition(suite, condition)
3032
def filter_suite_by_id_startswith(suite, start):
3033
"""Create a test suite by filtering another one.
3035
:param suite: The source suite.
3036
:param start: A list of string the test id must start with one of.
3037
:returns: the newly created suite
3039
condition = condition_id_startswith(start)
3040
result_suite = filter_suite_by_condition(suite, condition)
3044
def exclude_tests_by_re(suite, pattern):
3045
"""Create a test suite which excludes some tests from suite.
3047
:param suite: The suite to get tests from.
3048
:param pattern: A regular expression string. Test ids that match this
3049
pattern will be excluded from the result.
3050
:return: A TestSuite that contains all the tests from suite without the
3051
tests that matched pattern. The order of tests is the same as it was in
3054
return exclude_tests_by_condition(suite, condition_id_re(pattern))
3057
def preserve_input(something):
3058
"""A helper for performing test suite transformation chains.
3060
:param something: Anything you want to preserve.
3066
def randomize_suite(suite):
3067
"""Return a new TestSuite with suite's tests in random order.
3069
The tests in the input suite are flattened into a single suite in order to
3070
accomplish this. Any nested TestSuites are removed to provide global
3073
tests = list(iter_suite_tests(suite))
3074
random.shuffle(tests)
3075
return TestUtil.TestSuite(tests)
3078
def split_suite_by_condition(suite, condition):
3079
"""Split a test suite into two by a condition.
3081
:param suite: The suite to split.
3082
:param condition: The condition to match on. Tests that match this
3083
condition are returned in the first test suite, ones that do not match
3084
are in the second suite.
3085
:return: A tuple of two test suites, where the first contains tests from
3086
suite matching the condition, and the second contains the remainder
3087
from suite. The order within each output suite is the same as it was in
3092
for test in iter_suite_tests(suite):
3094
matched.append(test)
3096
did_not_match.append(test)
3097
return TestUtil.TestSuite(matched), TestUtil.TestSuite(did_not_match)
3100
def split_suite_by_re(suite, pattern):
3101
"""Split a test suite into two by a regular expression.
3103
:param suite: The suite to split.
3104
:param pattern: A regular expression string. Test ids that match this
3105
pattern will be in the first test suite returned, and the others in the
3106
second test suite returned.
3107
:return: A tuple of two test suites, where the first contains tests from
3108
suite matching pattern, and the second contains the remainder from
3109
suite. The order within each output suite is the same as it was in
3112
return split_suite_by_condition(suite, condition_id_re(pattern))
3115
def run_suite(suite, name='test', verbose=False, pattern=".*",
3116
stop_on_failure=False,
3117
transport=None, lsprof_timed=None, bench_history=None,
3118
matching_tests_first=None,
3121
exclude_pattern=None,
3124
suite_decorators=None,
3126
result_decorators=None,
3128
"""Run a test suite for bzr selftest.
3130
:param runner_class: The class of runner to use. Must support the
3131
constructor arguments passed by run_suite which are more than standard
3133
:return: A boolean indicating success.
3135
TestCase._gather_lsprof_in_benchmarks = lsprof_timed
3140
if runner_class is None:
3141
runner_class = TextTestRunner
3144
runner = runner_class(stream=stream,
3146
verbosity=verbosity,
3147
bench_history=bench_history,
3149
result_decorators=result_decorators,
3151
runner.stop_on_failure=stop_on_failure
3152
# built in decorator factories:
3154
random_order(random_seed, runner),
3155
exclude_tests(exclude_pattern),
3157
if matching_tests_first:
3158
decorators.append(tests_first(pattern))
3160
decorators.append(filter_tests(pattern))
3161
if suite_decorators:
3162
decorators.extend(suite_decorators)
3163
# tell the result object how many tests will be running: (except if
3164
# --parallel=fork is being used. Robert said he will provide a better
3165
# progress design later -- vila 20090817)
3166
if fork_decorator not in decorators:
3167
decorators.append(CountingDecorator)
3168
for decorator in decorators:
3169
suite = decorator(suite)
3171
# Done after test suite decoration to allow randomisation etc
3172
# to take effect, though that is of marginal benefit.
3174
stream.write("Listing tests only ...\n")
3175
for t in iter_suite_tests(suite):
3176
stream.write("%s\n" % (t.id()))
3178
result = runner.run(suite)
3180
return result.wasStrictlySuccessful()
3182
return result.wasSuccessful()
3185
# A registry where get() returns a suite decorator.
3186
parallel_registry = registry.Registry()
3189
def fork_decorator(suite):
3190
if getattr(os, "fork", None) is None:
3191
raise errors.BzrCommandError("platform does not support fork,"
3192
" try --parallel=subprocess instead.")
3193
concurrency = osutils.local_concurrency()
3194
if concurrency == 1:
3196
from testtools import ConcurrentTestSuite
3197
return ConcurrentTestSuite(suite, fork_for_tests)
3198
parallel_registry.register('fork', fork_decorator)
3201
def subprocess_decorator(suite):
3202
concurrency = osutils.local_concurrency()
3203
if concurrency == 1:
3205
from testtools import ConcurrentTestSuite
3206
return ConcurrentTestSuite(suite, reinvoke_for_tests)
3207
parallel_registry.register('subprocess', subprocess_decorator)
3210
def exclude_tests(exclude_pattern):
3211
"""Return a test suite decorator that excludes tests."""
3212
if exclude_pattern is None:
3213
return identity_decorator
3214
def decorator(suite):
3215
return ExcludeDecorator(suite, exclude_pattern)
3219
def filter_tests(pattern):
3221
return identity_decorator
3222
def decorator(suite):
3223
return FilterTestsDecorator(suite, pattern)
3227
def random_order(random_seed, runner):
3228
"""Return a test suite decorator factory for randomising tests order.
3230
:param random_seed: now, a string which casts to a long, or a long.
3231
:param runner: A test runner with a stream attribute to report on.
3233
if random_seed is None:
3234
return identity_decorator
3235
def decorator(suite):
3236
return RandomDecorator(suite, random_seed, runner.stream)
3240
def tests_first(pattern):
3242
return identity_decorator
3243
def decorator(suite):
3244
return TestFirstDecorator(suite, pattern)
3248
def identity_decorator(suite):
3253
class TestDecorator(TestUtil.TestSuite):
3254
"""A decorator for TestCase/TestSuite objects.
3256
Usually, subclasses should override __iter__(used when flattening test
3257
suites), which we do to filter, reorder, parallelise and so on, run() and
3261
def __init__(self, suite):
3262
TestUtil.TestSuite.__init__(self)
3265
def countTestCases(self):
3268
cases += test.countTestCases()
3275
def run(self, result):
3276
# Use iteration on self, not self._tests, to allow subclasses to hook
3279
if result.shouldStop:
3285
class CountingDecorator(TestDecorator):
3286
"""A decorator which calls result.progress(self.countTestCases)."""
3288
def run(self, result):
3289
progress_method = getattr(result, 'progress', None)
3290
if callable(progress_method):
3291
progress_method(self.countTestCases(), SUBUNIT_SEEK_SET)
3292
return super(CountingDecorator, self).run(result)
3295
class ExcludeDecorator(TestDecorator):
3296
"""A decorator which excludes test matching an exclude pattern."""
3298
def __init__(self, suite, exclude_pattern):
3299
TestDecorator.__init__(self, suite)
3300
self.exclude_pattern = exclude_pattern
3301
self.excluded = False
3305
return iter(self._tests)
3306
self.excluded = True
3307
suite = exclude_tests_by_re(self, self.exclude_pattern)
3309
self.addTests(suite)
3310
return iter(self._tests)
3313
class FilterTestsDecorator(TestDecorator):
3314
"""A decorator which filters tests to those matching a pattern."""
3316
def __init__(self, suite, pattern):
3317
TestDecorator.__init__(self, suite)
3318
self.pattern = pattern
3319
self.filtered = False
3323
return iter(self._tests)
3324
self.filtered = True
3325
suite = filter_suite_by_re(self, self.pattern)
3327
self.addTests(suite)
3328
return iter(self._tests)
3331
class RandomDecorator(TestDecorator):
3332
"""A decorator which randomises the order of its tests."""
3334
def __init__(self, suite, random_seed, stream):
3335
TestDecorator.__init__(self, suite)
3336
self.random_seed = random_seed
3337
self.randomised = False
3338
self.stream = stream
3342
return iter(self._tests)
3343
self.randomised = True
3344
self.stream.write("Randomizing test order using seed %s\n\n" %
3345
(self.actual_seed()))
3346
# Initialise the random number generator.
3347
random.seed(self.actual_seed())
3348
suite = randomize_suite(self)
3350
self.addTests(suite)
3351
return iter(self._tests)
3353
def actual_seed(self):
3354
if self.random_seed == "now":
3355
# We convert the seed to a long to make it reuseable across
3356
# invocations (because the user can reenter it).
3357
self.random_seed = long(time.time())
3359
# Convert the seed to a long if we can
3361
self.random_seed = long(self.random_seed)
3364
return self.random_seed
3367
class TestFirstDecorator(TestDecorator):
3368
"""A decorator which moves named tests to the front."""
3370
def __init__(self, suite, pattern):
3371
TestDecorator.__init__(self, suite)
3372
self.pattern = pattern
3373
self.filtered = False
3377
return iter(self._tests)
3378
self.filtered = True
3379
suites = split_suite_by_re(self, self.pattern)
3381
self.addTests(suites)
3382
return iter(self._tests)
3385
def partition_tests(suite, count):
3386
"""Partition suite into count lists of tests."""
3387
# This just assigns tests in a round-robin fashion. On one hand this
3388
# splits up blocks of related tests that might run faster if they shared
3389
# resources, but on the other it avoids assigning blocks of slow tests to
3390
# just one partition. So the slowest partition shouldn't be much slower
3392
partitions = [list() for i in range(count)]
3393
tests = iter_suite_tests(suite)
3394
for partition, test in itertools.izip(itertools.cycle(partitions), tests):
3395
partition.append(test)
3399
def workaround_zealous_crypto_random():
3400
"""Crypto.Random want to help us being secure, but we don't care here.
3402
This workaround some test failure related to the sftp server. Once paramiko
3403
stop using the controversial API in Crypto.Random, we may get rid of it.
3406
from Crypto.Random import atfork
3412
def fork_for_tests(suite):
3413
"""Take suite and start up one runner per CPU by forking()
3415
:return: An iterable of TestCase-like objects which can each have
3416
run(result) called on them to feed tests to result.
3418
concurrency = osutils.local_concurrency()
3420
from subunit import TestProtocolClient, ProtocolTestCase
3421
from subunit.test_results import AutoTimingTestResultDecorator
3422
class TestInOtherProcess(ProtocolTestCase):
3423
# Should be in subunit, I think. RBC.
3424
def __init__(self, stream, pid):
3425
ProtocolTestCase.__init__(self, stream)
3428
def run(self, result):
3430
ProtocolTestCase.run(self, result)
3432
os.waitpid(self.pid, 0)
3434
test_blocks = partition_tests(suite, concurrency)
3435
for process_tests in test_blocks:
3436
process_suite = TestUtil.TestSuite()
3437
process_suite.addTests(process_tests)
3438
c2pread, c2pwrite = os.pipe()
3441
workaround_zealous_crypto_random()
3444
# Leave stderr and stdout open so we can see test noise
3445
# Close stdin so that the child goes away if it decides to
3446
# read from stdin (otherwise its a roulette to see what
3447
# child actually gets keystrokes for pdb etc).
3450
stream = os.fdopen(c2pwrite, 'wb', 1)
3451
subunit_result = AutoTimingTestResultDecorator(
3452
TestProtocolClient(stream))
3453
process_suite.run(subunit_result)
3458
stream = os.fdopen(c2pread, 'rb', 1)
3459
test = TestInOtherProcess(stream, pid)
3464
def reinvoke_for_tests(suite):
3465
"""Take suite and start up one runner per CPU using subprocess().
3467
:return: An iterable of TestCase-like objects which can each have
3468
run(result) called on them to feed tests to result.
3470
concurrency = osutils.local_concurrency()
3472
from subunit import ProtocolTestCase
3473
class TestInSubprocess(ProtocolTestCase):
3474
def __init__(self, process, name):
3475
ProtocolTestCase.__init__(self, process.stdout)
3476
self.process = process
3477
self.process.stdin.close()
3480
def run(self, result):
3482
ProtocolTestCase.run(self, result)
3485
os.unlink(self.name)
3486
# print "pid %d finished" % finished_process
3487
test_blocks = partition_tests(suite, concurrency)
3488
for process_tests in test_blocks:
3489
# ugly; currently reimplement rather than reuses TestCase methods.
3490
bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
3491
if not os.path.isfile(bzr_path):
3492
# We are probably installed. Assume sys.argv is the right file
3493
bzr_path = sys.argv[0]
3494
bzr_path = [bzr_path]
3495
if sys.platform == "win32":
3496
# if we're on windows, we can't execute the bzr script directly
3497
bzr_path = [sys.executable] + bzr_path
3498
fd, test_list_file_name = tempfile.mkstemp()
3499
test_list_file = os.fdopen(fd, 'wb', 1)
3500
for test in process_tests:
3501
test_list_file.write(test.id() + '\n')
3502
test_list_file.close()
3504
argv = bzr_path + ['selftest', '--load-list', test_list_file_name,
3506
if '--no-plugins' in sys.argv:
3507
argv.append('--no-plugins')
3508
# stderr=subprocess.STDOUT would be ideal, but until we prevent
3509
# noise on stderr it can interrupt the subunit protocol.
3510
process = subprocess.Popen(argv, stdin=subprocess.PIPE,
3511
stdout=subprocess.PIPE,
3512
stderr=subprocess.PIPE,
3514
test = TestInSubprocess(process, test_list_file_name)
3517
os.unlink(test_list_file_name)
3522
class ProfileResult(testtools.ExtendedToOriginalDecorator):
3523
"""Generate profiling data for all activity between start and success.
3525
The profile data is appended to the test's _benchcalls attribute and can
3526
be accessed by the forwarded-to TestResult.
3528
While it might be cleaner do accumulate this in stopTest, addSuccess is
3529
where our existing output support for lsprof is, and this class aims to
3530
fit in with that: while it could be moved it's not necessary to accomplish
3531
test profiling, nor would it be dramatically cleaner.
3534
def startTest(self, test):
3535
self.profiler = bzrlib.lsprof.BzrProfiler()
3536
# Prevent deadlocks in tests that use lsprof: those tests will
3538
bzrlib.lsprof.BzrProfiler.profiler_block = 0
3539
self.profiler.start()
3540
testtools.ExtendedToOriginalDecorator.startTest(self, test)
3542
def addSuccess(self, test):
3543
stats = self.profiler.stop()
3545
calls = test._benchcalls
3546
except AttributeError:
3547
test._benchcalls = []
3548
calls = test._benchcalls
3549
calls.append(((test.id(), "", ""), stats))
3550
testtools.ExtendedToOriginalDecorator.addSuccess(self, test)
3552
def stopTest(self, test):
3553
testtools.ExtendedToOriginalDecorator.stopTest(self, test)
3554
self.profiler = None
3557
# Controlled by "bzr selftest -E=..." option
3558
# Currently supported:
3559
# -Eallow_debug Will no longer clear debug.debug_flags() so it
3560
# preserves any flags supplied at the command line.
3561
# -Edisable_lock_checks Turns errors in mismatched locks into simple prints
3562
# rather than failing tests. And no longer raise
3563
# LockContention when fctnl locks are not being used
3564
# with proper exclusion rules.
3565
# -Ethreads Will display thread ident at creation/join time to
3566
# help track thread leaks
3568
# -Econfig_stats Will collect statistics using addDetail
3569
selftest_debug_flags = set()
3572
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
3574
test_suite_factory=None,
3577
matching_tests_first=None,
3580
exclude_pattern=None,
3586
suite_decorators=None,
3590
"""Run the whole test suite under the enhanced runner"""
3591
# XXX: Very ugly way to do this...
3592
# Disable warning about old formats because we don't want it to disturb
3593
# any blackbox tests.
3594
from bzrlib import repository
3595
repository._deprecation_warning_done = True
3597
global default_transport
3598
if transport is None:
3599
transport = default_transport
3600
old_transport = default_transport
3601
default_transport = transport
3602
global selftest_debug_flags
3603
old_debug_flags = selftest_debug_flags
3604
if debug_flags is not None:
3605
selftest_debug_flags = set(debug_flags)
3607
if load_list is None:
3610
keep_only = load_test_id_list(load_list)
3612
starting_with = [test_prefix_alias_registry.resolve_alias(start)
3613
for start in starting_with]
3614
if test_suite_factory is None:
3615
# Reduce loading time by loading modules based on the starting_with
3617
suite = test_suite(keep_only, starting_with)
3619
suite = test_suite_factory()
3621
# But always filter as requested.
3622
suite = filter_suite_by_id_startswith(suite, starting_with)
3623
result_decorators = []
3625
result_decorators.append(ProfileResult)
3626
return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
3627
stop_on_failure=stop_on_failure,
3628
transport=transport,
3629
lsprof_timed=lsprof_timed,
3630
bench_history=bench_history,
3631
matching_tests_first=matching_tests_first,
3632
list_only=list_only,
3633
random_seed=random_seed,
3634
exclude_pattern=exclude_pattern,
3636
runner_class=runner_class,
3637
suite_decorators=suite_decorators,
3639
result_decorators=result_decorators,
3642
default_transport = old_transport
3643
selftest_debug_flags = old_debug_flags
3646
def load_test_id_list(file_name):
3647
"""Load a test id list from a text file.
3649
The format is one test id by line. No special care is taken to impose
3650
strict rules, these test ids are used to filter the test suite so a test id
3651
that do not match an existing test will do no harm. This allows user to add
3652
comments, leave blank lines, etc.
3656
ftest = open(file_name, 'rt')
3658
if e.errno != errno.ENOENT:
3661
raise errors.NoSuchFile(file_name)
3663
for test_name in ftest.readlines():
3664
test_list.append(test_name.strip())
3669
def suite_matches_id_list(test_suite, id_list):
3670
"""Warns about tests not appearing or appearing more than once.
3672
:param test_suite: A TestSuite object.
3673
:param test_id_list: The list of test ids that should be found in
3676
:return: (absents, duplicates) absents is a list containing the test found
3677
in id_list but not in test_suite, duplicates is a list containing the
3678
test found multiple times in test_suite.
3680
When using a prefined test id list, it may occurs that some tests do not
3681
exist anymore or that some tests use the same id. This function warns the
3682
tester about potential problems in his workflow (test lists are volatile)
3683
or in the test suite itself (using the same id for several tests does not
3684
help to localize defects).
3686
# Build a dict counting id occurrences
3688
for test in iter_suite_tests(test_suite):
3690
tests[id] = tests.get(id, 0) + 1
3695
occurs = tests.get(id, 0)
3697
not_found.append(id)
3699
duplicates.append(id)
3701
return not_found, duplicates
3704
class TestIdList(object):
3705
"""Test id list to filter a test suite.
3707
Relying on the assumption that test ids are built as:
3708
<module>[.<class>.<method>][(<param>+)], <module> being in python dotted
3709
notation, this class offers methods to :
3710
- avoid building a test suite for modules not refered to in the test list,
3711
- keep only the tests listed from the module test suite.
3714
def __init__(self, test_id_list):
3715
# When a test suite needs to be filtered against us we compare test ids
3716
# for equality, so a simple dict offers a quick and simple solution.
3717
self.tests = dict().fromkeys(test_id_list, True)
3719
# While unittest.TestCase have ids like:
3720
# <module>.<class>.<method>[(<param+)],
3721
# doctest.DocTestCase can have ids like:
3724
# <module>.<function>
3725
# <module>.<class>.<method>
3727
# Since we can't predict a test class from its name only, we settle on
3728
# a simple constraint: a test id always begins with its module name.
3731
for test_id in test_id_list:
3732
parts = test_id.split('.')
3733
mod_name = parts.pop(0)
3734
modules[mod_name] = True
3736
mod_name += '.' + part
3737
modules[mod_name] = True
3738
self.modules = modules
3740
def refers_to(self, module_name):
3741
"""Is there tests for the module or one of its sub modules."""
3742
return self.modules.has_key(module_name)
3744
def includes(self, test_id):
3745
return self.tests.has_key(test_id)
3748
class TestPrefixAliasRegistry(registry.Registry):
3749
"""A registry for test prefix aliases.
3751
This helps implement shorcuts for the --starting-with selftest
3752
option. Overriding existing prefixes is not allowed but not fatal (a
3753
warning will be emitted).
3756
def register(self, key, obj, help=None, info=None,
3757
override_existing=False):
3758
"""See Registry.register.
3760
Trying to override an existing alias causes a warning to be emitted,
3761
not a fatal execption.
3764
super(TestPrefixAliasRegistry, self).register(
3765
key, obj, help=help, info=info, override_existing=False)
3767
actual = self.get(key)
3769
'Test prefix alias %s is already used for %s, ignoring %s'
3770
% (key, actual, obj))
3772
def resolve_alias(self, id_start):
3773
"""Replace the alias by the prefix in the given string.
3775
Using an unknown prefix is an error to help catching typos.
3777
parts = id_start.split('.')
3779
parts[0] = self.get(parts[0])
3781
raise errors.BzrCommandError(
3782
'%s is not a known test prefix alias' % parts[0])
3783
return '.'.join(parts)
3786
test_prefix_alias_registry = TestPrefixAliasRegistry()
3787
"""Registry of test prefix aliases."""
3790
# This alias allows to detect typos ('bzrlin.') by making all valid test ids
3791
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
3792
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
3794
# Obvious highest levels prefixes, feel free to add your own via a plugin
3795
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
3796
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
3797
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
3798
test_prefix_alias_registry.register('bb', 'bzrlib.tests.blackbox')
3799
test_prefix_alias_registry.register('bp', 'bzrlib.plugins')
3802
def _test_suite_testmod_names():
3803
"""Return the standard list of test module names to test."""
3806
'bzrlib.tests.blackbox',
3807
'bzrlib.tests.commands',
3808
'bzrlib.tests.doc_generate',
3809
'bzrlib.tests.per_branch',
3810
'bzrlib.tests.per_bzrdir',
3811
'bzrlib.tests.per_controldir',
3812
'bzrlib.tests.per_controldir_colo',
3813
'bzrlib.tests.per_foreign_vcs',
3814
'bzrlib.tests.per_interrepository',
3815
'bzrlib.tests.per_intertree',
3816
'bzrlib.tests.per_inventory',
3817
'bzrlib.tests.per_interbranch',
3818
'bzrlib.tests.per_lock',
3819
'bzrlib.tests.per_merger',
3820
'bzrlib.tests.per_transport',
3821
'bzrlib.tests.per_tree',
3822
'bzrlib.tests.per_pack_repository',
3823
'bzrlib.tests.per_repository',
3824
'bzrlib.tests.per_repository_chk',
3825
'bzrlib.tests.per_repository_reference',
3826
'bzrlib.tests.per_repository_vf',
3827
'bzrlib.tests.per_uifactory',
3828
'bzrlib.tests.per_versionedfile',
3829
'bzrlib.tests.per_workingtree',
3830
'bzrlib.tests.test__annotator',
3831
'bzrlib.tests.test__bencode',
3832
'bzrlib.tests.test__btree_serializer',
3833
'bzrlib.tests.test__chk_map',
3834
'bzrlib.tests.test__dirstate_helpers',
3835
'bzrlib.tests.test__groupcompress',
3836
'bzrlib.tests.test__known_graph',
3837
'bzrlib.tests.test__rio',
3838
'bzrlib.tests.test__simple_set',
3839
'bzrlib.tests.test__static_tuple',
3840
'bzrlib.tests.test__walkdirs_win32',
3841
'bzrlib.tests.test_ancestry',
3842
'bzrlib.tests.test_annotate',
3843
'bzrlib.tests.test_api',
3844
'bzrlib.tests.test_atomicfile',
3845
'bzrlib.tests.test_bad_files',
3846
'bzrlib.tests.test_bisect_multi',
3847
'bzrlib.tests.test_branch',
3848
'bzrlib.tests.test_branchbuilder',
3849
'bzrlib.tests.test_btree_index',
3850
'bzrlib.tests.test_bugtracker',
3851
'bzrlib.tests.test_bundle',
3852
'bzrlib.tests.test_bzrdir',
3853
'bzrlib.tests.test__chunks_to_lines',
3854
'bzrlib.tests.test_cache_utf8',
3855
'bzrlib.tests.test_chk_map',
3856
'bzrlib.tests.test_chk_serializer',
3857
'bzrlib.tests.test_chunk_writer',
3858
'bzrlib.tests.test_clean_tree',
3859
'bzrlib.tests.test_cleanup',
3860
'bzrlib.tests.test_cmdline',
3861
'bzrlib.tests.test_commands',
3862
'bzrlib.tests.test_commit',
3863
'bzrlib.tests.test_commit_merge',
3864
'bzrlib.tests.test_config',
3865
'bzrlib.tests.test_conflicts',
3866
'bzrlib.tests.test_controldir',
3867
'bzrlib.tests.test_counted_lock',
3868
'bzrlib.tests.test_crash',
3869
'bzrlib.tests.test_decorators',
3870
'bzrlib.tests.test_delta',
3871
'bzrlib.tests.test_debug',
3872
'bzrlib.tests.test_diff',
3873
'bzrlib.tests.test_directory_service',
3874
'bzrlib.tests.test_dirstate',
3875
'bzrlib.tests.test_email_message',
3876
'bzrlib.tests.test_eol_filters',
3877
'bzrlib.tests.test_errors',
3878
'bzrlib.tests.test_export',
3879
'bzrlib.tests.test_export_pot',
3880
'bzrlib.tests.test_extract',
3881
'bzrlib.tests.test_fetch',
3882
'bzrlib.tests.test_fixtures',
3883
'bzrlib.tests.test_fifo_cache',
3884
'bzrlib.tests.test_filters',
3885
'bzrlib.tests.test_ftp_transport',
3886
'bzrlib.tests.test_foreign',
3887
'bzrlib.tests.test_generate_docs',
3888
'bzrlib.tests.test_generate_ids',
3889
'bzrlib.tests.test_globbing',
3890
'bzrlib.tests.test_gpg',
3891
'bzrlib.tests.test_graph',
3892
'bzrlib.tests.test_groupcompress',
3893
'bzrlib.tests.test_hashcache',
3894
'bzrlib.tests.test_help',
3895
'bzrlib.tests.test_hooks',
3896
'bzrlib.tests.test_http',
3897
'bzrlib.tests.test_http_response',
3898
'bzrlib.tests.test_https_ca_bundle',
3899
'bzrlib.tests.test_i18n',
3900
'bzrlib.tests.test_identitymap',
3901
'bzrlib.tests.test_ignores',
3902
'bzrlib.tests.test_index',
3903
'bzrlib.tests.test_import_tariff',
3904
'bzrlib.tests.test_info',
3905
'bzrlib.tests.test_inv',
3906
'bzrlib.tests.test_inventory_delta',
3907
'bzrlib.tests.test_knit',
3908
'bzrlib.tests.test_lazy_import',
3909
'bzrlib.tests.test_lazy_regex',
3910
'bzrlib.tests.test_library_state',
3911
'bzrlib.tests.test_lock',
3912
'bzrlib.tests.test_lockable_files',
3913
'bzrlib.tests.test_lockdir',
3914
'bzrlib.tests.test_log',
3915
'bzrlib.tests.test_lru_cache',
3916
'bzrlib.tests.test_lsprof',
3917
'bzrlib.tests.test_mail_client',
3918
'bzrlib.tests.test_matchers',
3919
'bzrlib.tests.test_memorytree',
3920
'bzrlib.tests.test_merge',
3921
'bzrlib.tests.test_merge3',
3922
'bzrlib.tests.test_merge_core',
3923
'bzrlib.tests.test_merge_directive',
3924
'bzrlib.tests.test_mergetools',
3925
'bzrlib.tests.test_missing',
3926
'bzrlib.tests.test_msgeditor',
3927
'bzrlib.tests.test_multiparent',
3928
'bzrlib.tests.test_mutabletree',
3929
'bzrlib.tests.test_nonascii',
3930
'bzrlib.tests.test_options',
3931
'bzrlib.tests.test_osutils',
3932
'bzrlib.tests.test_osutils_encodings',
3933
'bzrlib.tests.test_pack',
3934
'bzrlib.tests.test_patch',
3935
'bzrlib.tests.test_patches',
3936
'bzrlib.tests.test_permissions',
3937
'bzrlib.tests.test_plugins',
3938
'bzrlib.tests.test_progress',
3939
'bzrlib.tests.test_pyutils',
3940
'bzrlib.tests.test_read_bundle',
3941
'bzrlib.tests.test_reconcile',
3942
'bzrlib.tests.test_reconfigure',
3943
'bzrlib.tests.test_registry',
3944
'bzrlib.tests.test_remote',
3945
'bzrlib.tests.test_rename_map',
3946
'bzrlib.tests.test_repository',
3947
'bzrlib.tests.test_revert',
3948
'bzrlib.tests.test_revision',
3949
'bzrlib.tests.test_revisionspec',
3950
'bzrlib.tests.test_revisiontree',
3951
'bzrlib.tests.test_rio',
3952
'bzrlib.tests.test_rules',
3953
'bzrlib.tests.test_sampler',
3954
'bzrlib.tests.test_scenarios',
3955
'bzrlib.tests.test_script',
3956
'bzrlib.tests.test_selftest',
3957
'bzrlib.tests.test_serializer',
3958
'bzrlib.tests.test_setup',
3959
'bzrlib.tests.test_sftp_transport',
3960
'bzrlib.tests.test_shelf',
3961
'bzrlib.tests.test_shelf_ui',
3962
'bzrlib.tests.test_smart',
3963
'bzrlib.tests.test_smart_add',
3964
'bzrlib.tests.test_smart_request',
3965
'bzrlib.tests.test_smart_transport',
3966
'bzrlib.tests.test_smtp_connection',
3967
'bzrlib.tests.test_source',
3968
'bzrlib.tests.test_ssh_transport',
3969
'bzrlib.tests.test_status',
3970
'bzrlib.tests.test_store',
3971
'bzrlib.tests.test_strace',
3972
'bzrlib.tests.test_subsume',
3973
'bzrlib.tests.test_switch',
3974
'bzrlib.tests.test_symbol_versioning',
3975
'bzrlib.tests.test_tag',
3976
'bzrlib.tests.test_test_server',
3977
'bzrlib.tests.test_testament',
3978
'bzrlib.tests.test_textfile',
3979
'bzrlib.tests.test_textmerge',
3980
'bzrlib.tests.test_cethread',
3981
'bzrlib.tests.test_timestamp',
3982
'bzrlib.tests.test_trace',
3983
'bzrlib.tests.test_transactions',
3984
'bzrlib.tests.test_transform',
3985
'bzrlib.tests.test_transport',
3986
'bzrlib.tests.test_transport_log',
3987
'bzrlib.tests.test_tree',
3988
'bzrlib.tests.test_treebuilder',
3989
'bzrlib.tests.test_treeshape',
3990
'bzrlib.tests.test_tsort',
3991
'bzrlib.tests.test_tuned_gzip',
3992
'bzrlib.tests.test_ui',
3993
'bzrlib.tests.test_uncommit',
3994
'bzrlib.tests.test_upgrade',
3995
'bzrlib.tests.test_upgrade_stacked',
3996
'bzrlib.tests.test_urlutils',
3997
'bzrlib.tests.test_utextwrap',
3998
'bzrlib.tests.test_version',
3999
'bzrlib.tests.test_version_info',
4000
'bzrlib.tests.test_versionedfile',
4001
'bzrlib.tests.test_weave',
4002
'bzrlib.tests.test_whitebox',
4003
'bzrlib.tests.test_win32utils',
4004
'bzrlib.tests.test_workingtree',
4005
'bzrlib.tests.test_workingtree_4',
4006
'bzrlib.tests.test_wsgi',
4007
'bzrlib.tests.test_xml',
4011
def _test_suite_modules_to_doctest():
4012
"""Return the list of modules to doctest."""
4014
# GZ 2009-03-31: No docstrings with -OO so there's nothing to doctest
4018
'bzrlib.branchbuilder',
4019
'bzrlib.decorators',
4021
'bzrlib.iterablefile',
4026
'bzrlib.symbol_versioning',
4028
'bzrlib.tests.fixtures',
4030
'bzrlib.transport.http',
4031
'bzrlib.version_info_formats.format_custom',
4035
def test_suite(keep_only=None, starting_with=None):
4036
"""Build and return TestSuite for the whole of bzrlib.
4038
:param keep_only: A list of test ids limiting the suite returned.
4040
:param starting_with: An id limiting the suite returned to the tests
4043
This function can be replaced if you need to change the default test
4044
suite on a global basis, but it is not encouraged.
4047
loader = TestUtil.TestLoader()
4049
if keep_only is not None:
4050
id_filter = TestIdList(keep_only)
4052
# We take precedence over keep_only because *at loading time* using
4053
# both options means we will load less tests for the same final result.
4054
def interesting_module(name):
4055
for start in starting_with:
4057
# Either the module name starts with the specified string
4058
name.startswith(start)
4059
# or it may contain tests starting with the specified string
4060
or start.startswith(name)
4064
loader = TestUtil.FilteredByModuleTestLoader(interesting_module)
4066
elif keep_only is not None:
4067
loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
4068
def interesting_module(name):
4069
return id_filter.refers_to(name)
4072
loader = TestUtil.TestLoader()
4073
def interesting_module(name):
4074
# No filtering, all modules are interesting
4077
suite = loader.suiteClass()
4079
# modules building their suite with loadTestsFromModuleNames
4080
suite.addTest(loader.loadTestsFromModuleNames(_test_suite_testmod_names()))
4082
for mod in _test_suite_modules_to_doctest():
4083
if not interesting_module(mod):
4084
# No tests to keep here, move along
4087
# note that this really does mean "report only" -- doctest
4088
# still runs the rest of the examples
4089
doc_suite = IsolatedDocTestSuite(
4090
mod, optionflags=doctest.REPORT_ONLY_FIRST_FAILURE)
4091
except ValueError, e:
4092
print '**failed to get doctest for: %s\n%s' % (mod, e)
4094
if len(doc_suite._tests) == 0:
4095
raise errors.BzrError("no doctests found in %s" % (mod,))
4096
suite.addTest(doc_suite)
4098
default_encoding = sys.getdefaultencoding()
4099
for name, plugin in _mod_plugin.plugins().items():
4100
if not interesting_module(plugin.module.__name__):
4102
plugin_suite = plugin.test_suite()
4103
# We used to catch ImportError here and turn it into just a warning,
4104
# but really if you don't have --no-plugins this should be a failure.
4105
# mbp 20080213 - see http://bugs.launchpad.net/bugs/189771
4106
if plugin_suite is None:
4107
plugin_suite = plugin.load_plugin_tests(loader)
4108
if plugin_suite is not None:
4109
suite.addTest(plugin_suite)
4110
if default_encoding != sys.getdefaultencoding():
4112
'Plugin "%s" tried to reset default encoding to: %s', name,
4113
sys.getdefaultencoding())
4115
sys.setdefaultencoding(default_encoding)
4117
if keep_only is not None:
4118
# Now that the referred modules have loaded their tests, keep only the
4120
suite = filter_suite_by_id_list(suite, id_filter)
4121
# Do some sanity checks on the id_list filtering
4122
not_found, duplicates = suite_matches_id_list(suite, keep_only)
4124
# The tester has used both keep_only and starting_with, so he is
4125
# already aware that some tests are excluded from the list, there
4126
# is no need to tell him which.
4129
# Some tests mentioned in the list are not in the test suite. The
4130
# list may be out of date, report to the tester.
4131
for id in not_found:
4132
trace.warning('"%s" not found in the test suite', id)
4133
for id in duplicates:
4134
trace.warning('"%s" is used as an id by several tests', id)
4139
def multiply_scenarios(*scenarios):
4140
"""Multiply two or more iterables of scenarios.
4142
It is safe to pass scenario generators or iterators.
4144
:returns: A list of compound scenarios: the cross-product of all
4145
scenarios, with the names concatenated and the parameters
4148
return reduce(_multiply_two_scenarios, map(list, scenarios))
4151
def _multiply_two_scenarios(scenarios_left, scenarios_right):
4152
"""Multiply two sets of scenarios.
4154
:returns: the cartesian product of the two sets of scenarios, that is
4155
a scenario for every possible combination of a left scenario and a
4159
('%s,%s' % (left_name, right_name),
4160
dict(left_dict.items() + right_dict.items()))
4161
for left_name, left_dict in scenarios_left
4162
for right_name, right_dict in scenarios_right]
4165
def multiply_tests(tests, scenarios, result):
4166
"""Multiply tests_list by scenarios into result.
4168
This is the core workhorse for test parameterisation.
4170
Typically the load_tests() method for a per-implementation test suite will
4171
call multiply_tests and return the result.
4173
:param tests: The tests to parameterise.
4174
:param scenarios: The scenarios to apply: pairs of (scenario_name,
4175
scenario_param_dict).
4176
:param result: A TestSuite to add created tests to.
4178
This returns the passed in result TestSuite with the cross product of all
4179
the tests repeated once for each scenario. Each test is adapted by adding
4180
the scenario name at the end of its id(), and updating the test object's
4181
__dict__ with the scenario_param_dict.
4183
>>> import bzrlib.tests.test_sampler
4184
>>> r = multiply_tests(
4185
... bzrlib.tests.test_sampler.DemoTest('test_nothing'),
4186
... [('one', dict(param=1)),
4187
... ('two', dict(param=2))],
4188
... TestUtil.TestSuite())
4189
>>> tests = list(iter_suite_tests(r))
4193
'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
4199
for test in iter_suite_tests(tests):
4200
apply_scenarios(test, scenarios, result)
4204
def apply_scenarios(test, scenarios, result):
4205
"""Apply the scenarios in scenarios to test and add to result.
4207
:param test: The test to apply scenarios to.
4208
:param scenarios: An iterable of scenarios to apply to test.
4210
:seealso: apply_scenario
4212
for scenario in scenarios:
4213
result.addTest(apply_scenario(test, scenario))
4217
def apply_scenario(test, scenario):
4218
"""Copy test and apply scenario to it.
4220
:param test: A test to adapt.
4221
:param scenario: A tuple describing the scenarion.
4222
The first element of the tuple is the new test id.
4223
The second element is a dict containing attributes to set on the
4225
:return: The adapted test.
4227
new_id = "%s(%s)" % (test.id(), scenario[0])
4228
new_test = clone_test(test, new_id)
4229
for name, value in scenario[1].items():
4230
setattr(new_test, name, value)
4234
def clone_test(test, new_id):
4235
"""Clone a test giving it a new id.
4237
:param test: The test to clone.
4238
:param new_id: The id to assign to it.
4239
:return: The new test.
4241
new_test = copy.copy(test)
4242
new_test.id = lambda: new_id
4243
# XXX: Workaround <https://bugs.launchpad.net/testtools/+bug/637725>, which
4244
# causes cloned tests to share the 'details' dict. This makes it hard to
4245
# read the test output for parameterized tests, because tracebacks will be
4246
# associated with irrelevant tests.
4248
details = new_test._TestCase__details
4249
except AttributeError:
4250
# must be a different version of testtools than expected. Do nothing.
4253
# Reset the '__details' dict.
4254
new_test._TestCase__details = {}
4258
def permute_tests_for_extension(standard_tests, loader, py_module_name,
4260
"""Helper for permutating tests against an extension module.
4262
This is meant to be used inside a modules 'load_tests()' function. It will
4263
create 2 scenarios, and cause all tests in the 'standard_tests' to be run
4264
against both implementations. Setting 'test.module' to the appropriate
4265
module. See bzrlib.tests.test__chk_map.load_tests as an example.
4267
:param standard_tests: A test suite to permute
4268
:param loader: A TestLoader
4269
:param py_module_name: The python path to a python module that can always
4270
be loaded, and will be considered the 'python' implementation. (eg
4271
'bzrlib._chk_map_py')
4272
:param ext_module_name: The python path to an extension module. If the
4273
module cannot be loaded, a single test will be added, which notes that
4274
the module is not available. If it can be loaded, all standard_tests
4275
will be run against that module.
4276
:return: (suite, feature) suite is a test-suite that has all the permuted
4277
tests. feature is the Feature object that can be used to determine if
4278
the module is available.
4281
py_module = pyutils.get_named_object(py_module_name)
4283
('python', {'module': py_module}),
4285
suite = loader.suiteClass()
4286
feature = ModuleAvailableFeature(ext_module_name)
4287
if feature.available():
4288
scenarios.append(('C', {'module': feature.module}))
4290
# the compiled module isn't available, so we add a failing test
4291
class FailWithoutFeature(TestCase):
4292
def test_fail(self):
4293
self.requireFeature(feature)
4294
suite.addTest(loader.loadTestsFromTestCase(FailWithoutFeature))
4295
result = multiply_tests(standard_tests, scenarios, suite)
4296
return result, feature
4299
def _rmtree_temp_dir(dirname, test_id=None):
4300
# If LANG=C we probably have created some bogus paths
4301
# which rmtree(unicode) will fail to delete
4302
# so make sure we are using rmtree(str) to delete everything
4303
# except on win32, where rmtree(str) will fail
4304
# since it doesn't have the property of byte-stream paths
4305
# (they are either ascii or mbcs)
4306
if sys.platform == 'win32':
4307
# make sure we are using the unicode win32 api
4308
dirname = unicode(dirname)
4310
dirname = dirname.encode(sys.getfilesystemencoding())
4312
osutils.rmtree(dirname)
4314
# We don't want to fail here because some useful display will be lost
4315
# otherwise. Polluting the tmp dir is bad, but not giving all the
4316
# possible info to the test runner is even worse.
4318
ui.ui_factory.clear_term()
4319
sys.stderr.write('\nWhile running: %s\n' % (test_id,))
4320
# Ugly, but the last thing we want here is fail, so bear with it.
4321
printable_e = str(e).decode(osutils.get_user_encoding(), 'replace'
4322
).encode('ascii', 'replace')
4323
sys.stderr.write('Unable to remove testing dir %s\n%s'
4324
% (os.path.basename(dirname), printable_e))
4327
class Feature(object):
4328
"""An operating system Feature."""
4331
self._available = None
4333
def available(self):
4334
"""Is the feature available?
4336
:return: True if the feature is available.
4338
if self._available is None:
4339
self._available = self._probe()
4340
return self._available
4343
"""Implement this method in concrete features.
4345
:return: True if the feature is available.
4347
raise NotImplementedError
4350
if getattr(self, 'feature_name', None):
4351
return self.feature_name()
4352
return self.__class__.__name__
4355
class _SymlinkFeature(Feature):
4358
return osutils.has_symlinks()
4360
def feature_name(self):
4363
SymlinkFeature = _SymlinkFeature()
4366
class _HardlinkFeature(Feature):
4369
return osutils.has_hardlinks()
4371
def feature_name(self):
4374
HardlinkFeature = _HardlinkFeature()
4377
class _OsFifoFeature(Feature):
4380
return getattr(os, 'mkfifo', None)
4382
def feature_name(self):
4383
return 'filesystem fifos'
4385
OsFifoFeature = _OsFifoFeature()
4388
class _UnicodeFilenameFeature(Feature):
4389
"""Does the filesystem support Unicode filenames?"""
4393
# Check for character combinations unlikely to be covered by any
4394
# single non-unicode encoding. We use the characters
4395
# - greek small letter alpha (U+03B1) and
4396
# - braille pattern dots-123456 (U+283F).
4397
os.stat(u'\u03b1\u283f')
4398
except UnicodeEncodeError:
4400
except (IOError, OSError):
4401
# The filesystem allows the Unicode filename but the file doesn't
4405
# The filesystem allows the Unicode filename and the file exists,
4409
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4412
class _CompatabilityThunkFeature(Feature):
4413
"""This feature is just a thunk to another feature.
4415
It issues a deprecation warning if it is accessed, to let you know that you
4416
should really use a different feature.
4419
def __init__(self, dep_version, module, name,
4420
replacement_name, replacement_module=None):
4421
super(_CompatabilityThunkFeature, self).__init__()
4422
self._module = module
4423
if replacement_module is None:
4424
replacement_module = module
4425
self._replacement_module = replacement_module
4427
self._replacement_name = replacement_name
4428
self._dep_version = dep_version
4429
self._feature = None
4432
if self._feature is None:
4433
depr_msg = self._dep_version % ('%s.%s'
4434
% (self._module, self._name))
4435
use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4436
self._replacement_name)
4437
symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4438
# Import the new feature and use it as a replacement for the
4440
self._feature = pyutils.get_named_object(
4441
self._replacement_module, self._replacement_name)
4445
return self._feature._probe()
4448
class ModuleAvailableFeature(Feature):
4449
"""This is a feature than describes a module we want to be available.
4451
Declare the name of the module in __init__(), and then after probing, the
4452
module will be available as 'self.module'.
4454
:ivar module: The module if it is available, else None.
4457
def __init__(self, module_name):
4458
super(ModuleAvailableFeature, self).__init__()
4459
self.module_name = module_name
4463
self._module = __import__(self.module_name, {}, {}, [''])
4470
if self.available(): # Make sure the probe has been done
4474
def feature_name(self):
4475
return self.module_name
4478
def probe_unicode_in_user_encoding():
4479
"""Try to encode several unicode strings to use in unicode-aware tests.
4480
Return first successfull match.
4482
:return: (unicode value, encoded plain string value) or (None, None)
4484
possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
4485
for uni_val in possible_vals:
4487
str_val = uni_val.encode(osutils.get_user_encoding())
4488
except UnicodeEncodeError:
4489
# Try a different character
4492
return uni_val, str_val
4496
def probe_bad_non_ascii(encoding):
4497
"""Try to find [bad] character with code [128..255]
4498
that cannot be decoded to unicode in some encoding.
4499
Return None if all non-ascii characters is valid
4502
for i in xrange(128, 256):
4505
char.decode(encoding)
4506
except UnicodeDecodeError:
4511
class _HTTPSServerFeature(Feature):
4512
"""Some tests want an https Server, check if one is available.
4514
Right now, the only way this is available is under python2.6 which provides
4525
def feature_name(self):
4526
return 'HTTPSServer'
4529
HTTPSServerFeature = _HTTPSServerFeature()
4532
class _UnicodeFilename(Feature):
4533
"""Does the filesystem support Unicode filenames?"""
4538
except UnicodeEncodeError:
4540
except (IOError, OSError):
4541
# The filesystem allows the Unicode filename but the file doesn't
4545
# The filesystem allows the Unicode filename and the file exists,
4549
UnicodeFilename = _UnicodeFilename()
4552
class _ByteStringNamedFilesystem(Feature):
4553
"""Is the filesystem based on bytes?"""
4556
if os.name == "posix":
4560
ByteStringNamedFilesystem = _ByteStringNamedFilesystem()
4563
class _UTF8Filesystem(Feature):
4564
"""Is the filesystem UTF-8?"""
4567
if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
4571
UTF8Filesystem = _UTF8Filesystem()
4574
class _BreakinFeature(Feature):
4575
"""Does this platform support the breakin feature?"""
4578
from bzrlib import breakin
4579
if breakin.determine_signal() is None:
4581
if sys.platform == 'win32':
4582
# Windows doesn't have os.kill, and we catch the SIGBREAK signal.
4583
# We trigger SIGBREAK via a Console api so we need ctypes to
4584
# access the function
4591
def feature_name(self):
4592
return "SIGQUIT or SIGBREAK w/ctypes on win32"
4595
BreakinFeature = _BreakinFeature()
4598
class _CaseInsCasePresFilenameFeature(Feature):
4599
"""Is the file-system case insensitive, but case-preserving?"""
4602
fileno, name = tempfile.mkstemp(prefix='MixedCase')
4604
# first check truly case-preserving for created files, then check
4605
# case insensitive when opening existing files.
4606
name = osutils.normpath(name)
4607
base, rel = osutils.split(name)
4608
found_rel = osutils.canonical_relpath(base, name)
4609
return (found_rel == rel
4610
and os.path.isfile(name.upper())
4611
and os.path.isfile(name.lower()))
4616
def feature_name(self):
4617
return "case-insensitive case-preserving filesystem"
4619
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
4622
class _CaseInsensitiveFilesystemFeature(Feature):
4623
"""Check if underlying filesystem is case-insensitive but *not* case
4626
# Note that on Windows, Cygwin, MacOS etc, the file-systems are far
4627
# more likely to be case preserving, so this case is rare.
4630
if CaseInsCasePresFilenameFeature.available():
4633
if TestCaseWithMemoryTransport.TEST_ROOT is None:
4634
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
4635
TestCaseWithMemoryTransport.TEST_ROOT = root
4637
root = TestCaseWithMemoryTransport.TEST_ROOT
4638
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
4640
name_a = osutils.pathjoin(tdir, 'a')
4641
name_A = osutils.pathjoin(tdir, 'A')
4643
result = osutils.isdir(name_A)
4644
_rmtree_temp_dir(tdir)
4647
def feature_name(self):
4648
return 'case-insensitive filesystem'
4650
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4653
class _CaseSensitiveFilesystemFeature(Feature):
4656
if CaseInsCasePresFilenameFeature.available():
4658
elif CaseInsensitiveFilesystemFeature.available():
4663
def feature_name(self):
4664
return 'case-sensitive filesystem'
4666
# new coding style is for feature instances to be lowercase
4667
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4670
# Only define SubUnitBzrRunner if subunit is available.
4672
from subunit import TestProtocolClient
4673
from subunit.test_results import AutoTimingTestResultDecorator
4674
class SubUnitBzrProtocolClient(TestProtocolClient):
4676
def addSuccess(self, test, details=None):
4677
# The subunit client always includes the details in the subunit
4678
# stream, but we don't want to include it in ours.
4679
if details is not None and 'log' in details:
4681
return super(SubUnitBzrProtocolClient, self).addSuccess(
4684
class SubUnitBzrRunner(TextTestRunner):
4685
def run(self, test):
4686
result = AutoTimingTestResultDecorator(
4687
SubUnitBzrProtocolClient(self.stream))
4693
class _PosixPermissionsFeature(Feature):
4697
# create temporary file and check if specified perms are maintained.
4700
write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4701
f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4704
os.chmod(name, write_perms)
4706
read_perms = os.stat(name).st_mode & 0777
4708
return (write_perms == read_perms)
4710
return (os.name == 'posix') and has_perms()
4712
def feature_name(self):
4713
return 'POSIX permissions support'
4715
posix_permissions_feature = _PosixPermissionsFeature()