~bzr/ubuntu/lucid/bzr/beta-ppa

« back to all changes in this revision

Viewing changes to bzrlib/tests/per_repository/test_write_group.py

  • Committer: Martin Pool
  • Date: 2010-08-18 04:26:39 UTC
  • mfrom: (129.1.8 packaging-karmic)
  • Revision ID: mbp@sourcefrog.net-20100818042639-mjoxtngyjwiu05fo
* PPA rebuild for lucid.
* PPA rebuild for karmic.
* PPA rebuild onto jaunty.
* New upstream release.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2007 Canonical Ltd
 
1
# Copyright (C) 2007-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
19
19
import sys
20
20
 
21
21
from bzrlib import (
 
22
    branch,
22
23
    bzrdir,
23
24
    errors,
24
25
    graph,
25
26
    memorytree,
26
27
    osutils,
27
28
    remote,
 
29
    tests,
28
30
    versionedfile,
29
31
    )
30
 
from bzrlib.branch import BzrBranchFormat7
31
 
from bzrlib.transport import local, memory
32
 
from bzrlib.tests import TestNotApplicable
33
 
from bzrlib.tests.per_repository import TestCaseWithRepository
34
 
 
35
 
 
36
 
class TestWriteGroup(TestCaseWithRepository):
 
32
from bzrlib.tests import (
 
33
    per_repository,
 
34
    test_server,
 
35
    )
 
36
from bzrlib.transport import memory
 
37
 
 
38
 
 
39
class TestWriteGroup(per_repository.TestCaseWithRepository):
37
40
 
38
41
    def test_start_write_group_unlocked_needs_write_lock(self):
39
42
        repo = self.make_repository('.')
114
117
        repo.unlock()
115
118
 
116
119
    def test_abort_write_group_does_not_raise_when_suppressed(self):
117
 
        if self.transport_server is local.LocalURLServer:
 
120
        if self.transport_server is test_server.LocalURLServer:
118
121
            self.transport_server = None
119
122
        self.vfs_transport_factory = memory.MemoryServer
120
123
        repo = self.make_repository('repo')
131
134
        self.assertEqual(None, repo.abort_write_group(suppress_errors=True))
132
135
 
133
136
 
134
 
class TestGetMissingParentInventories(TestCaseWithRepository):
 
137
class TestGetMissingParentInventories(per_repository.TestCaseWithRepository):
135
138
 
136
139
    def test_empty_get_missing_parent_inventories(self):
137
140
        """A new write group has no missing parent inventories."""
183
186
        else:
184
187
            repo = self.make_repository(relpath)
185
188
        if not repo._format.supports_external_lookups:
186
 
            raise TestNotApplicable('format not stackable')
187
 
        repo.bzrdir._format.set_branch_format(BzrBranchFormat7())
 
189
            raise tests.TestNotApplicable('format not stackable')
 
190
        repo.bzrdir._format.set_branch_format(branch.BzrBranchFormat7())
188
191
        return repo
189
192
 
190
193
    def reopen_repo_and_resume_write_group(self, repo):
334
337
        repo = self.make_repository('test-repo')
335
338
        if (not repo._format.supports_external_lookups or
336
339
            isinstance(repo, remote.RemoteRepository)):
337
 
            raise TestNotApplicable(
 
340
            raise tests.TestNotApplicable(
338
341
                'only valid for direct connections to resumable repos')
339
342
        # log calls to get_missing_parent_inventories, so that we can assert it
340
343
        # is called with the correct parameters
361
364
        self.assertEqual([True], call_log)
362
365
 
363
366
 
364
 
class TestResumeableWriteGroup(TestCaseWithRepository):
 
367
class TestResumeableWriteGroup(per_repository.TestCaseWithRepository):
365
368
 
366
369
    def make_write_locked_repo(self, relpath='repo'):
367
370
        repo = self.make_repository(relpath)
384
387
            wg_tokens = repo.suspend_write_group()
385
388
        except errors.UnsuspendableWriteGroup:
386
389
            repo.abort_write_group()
387
 
            raise TestNotApplicable(reason)
 
390
            raise tests.TestNotApplicable(reason)
388
391
 
389
392
    def test_suspend_write_group(self):
390
393
        repo = self.make_write_locked_repo()