~bzr-pqm-devel/bzr-pqm/devel

« back to all changes in this revision

Viewing changes to tests/test_lpland.py

  • Committer: Aaron Bentley
  • Date: 2012-03-01 14:22:22 UTC
  • mfrom: (86.1.5 autoland-improvements)
  • Revision ID: aaron@aaronbentley.com-20120301142222-8ja3wwkjce7isoo2
Merge improvements from Launchpad's autoland module.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright 2010 Canonical Ltd.  This software is licensed under the
 
1
# Copyright 2010-2012 Canonical Ltd.  This software is licensed under the
2
2
# GNU Affero General Public License version 3 (see the file LICENSE).
3
3
 
4
4
"""Tests for automatic landing thing."""
6
6
__metaclass__ = type
7
7
 
8
8
from cStringIO import StringIO
 
9
import re
9
10
from textwrap import dedent
10
11
import unittest
11
12
 
15
16
)
16
17
from bzrlib.plugins.pqm.lpland import (
17
18
    get_bugs_clause,
 
19
    get_email,
18
20
    get_reviewer_clause,
19
21
    get_reviewer_handle,
20
22
    get_testfix_clause,
21
23
    get_qa_clause,
 
24
    LaunchpadBranchLander,
22
25
    MissingReviewError,
23
26
    MissingBugsError,
24
27
    MissingBugsIncrementalError,
25
28
    MergeProposal,
26
29
    Submitter,
27
30
)
28
 
from bzrlib.tests import TestCaseWithTransport
29
 
 
30
 
from fakemethod import FakeMethod
 
31
from bzrlib.tests import (
 
32
    TestCase,
 
33
    TestCaseWithTransport,
 
34
    )
 
35
 
 
36
from bzrlib.plugins.pqm.tests.fakemethod import FakeMethod
 
37
 
 
38
DEFAULT=object()
 
39
 
 
40
class FakeLaunchpad:
 
41
 
 
42
    def __init__(self, branches=None):
 
43
        self.branches = FakeLaunchpadBranches(branches)
 
44
        self.me = FakePerson()
 
45
 
 
46
 
 
47
class FakeBugTask:
 
48
 
 
49
    def __init__(self, target_name, status):
 
50
        self.bug_target_name = target_name
 
51
        self.status = status
31
52
 
32
53
 
33
54
class FakeBug:
36
57
    Only used for the purposes of testing.
37
58
    """
38
59
 
39
 
    def __init__(self, id):
 
60
    def __init__(self, id, bug_tasks=None):
40
61
        self.id = id
 
62
        if bug_tasks is None:
 
63
            bug_tasks = [FakeBugTask('launchpad', 'Triaged')]
 
64
        self.bug_tasks = bug_tasks
 
65
 
 
66
 
 
67
class FakeEmailAddress:
 
68
 
 
69
    def __init__(self, email):
 
70
        self.email = email
41
71
 
42
72
 
43
73
class FakePerson:
46
76
    Only used for the purposes of testing.
47
77
    """
48
78
 
49
 
    def __init__(self, name='jrandom', irc_handles=()):
 
79
    def __init__(self, name='jrandom', irc_handles=(), email=DEFAULT):
50
80
        self.name = name
51
81
        self.irc_nicknames = list(irc_handles)
 
82
        if email is not DEFAULT:
 
83
            self.preferred_email_address = email
 
84
        else:
 
85
            self.preferred_email_address = FakeEmailAddress(
 
86
                'jrandom@example.org')
52
87
 
53
88
 
54
89
class FakeIRC:
62
97
        self.network = network
63
98
 
64
99
 
 
100
class FakeBzrBranch:
 
101
 
 
102
    def __init__(self):
 
103
        pass
 
104
 
 
105
    def get_public_branch(self):
 
106
        return 'public'
 
107
 
 
108
 
 
109
class FakeLaunchpadBranches:
 
110
 
 
111
    def __init__(self, branches):
 
112
        self.branches = branches
 
113
 
 
114
    def getByUrl(self, url):
 
115
        for branch in self.branches:
 
116
            if branch.location == url:
 
117
                return branch
 
118
 
 
119
 
65
120
class FakeBranch:
66
121
 
67
122
    def __init__(self, location):
68
123
        self.location = location
69
124
        self.linked_bugs = [FakeBug(5)]
 
125
        self.landing_targets = []
 
126
        self.owner = FakePerson()
70
127
 
71
128
    def composePublicURL(self, scheme):
72
129
        return self.location
92
149
    Only used for the purposes of testing.
93
150
    """
94
151
 
95
 
    def __init__(self, root=None):
 
152
    def __init__(self, root=None, queue_status='Approved', votes=None,
 
153
                 reviewer=None):
 
154
        if root is None:
 
155
            root = FakeLaunchpad()
96
156
        self._root = root
97
157
        self.source_branch = FakeBranch('lp_source')
98
158
        self.target_branch = FakeBranch('lp_target')
99
159
        self.commit_message = 'Message1'
100
 
        self.votes = [FakeVote()]
 
160
        if votes is not None:
 
161
            self.votes = votes
 
162
        else:
 
163
            self.votes = [FakeVote()]
 
164
        self.queue_status = queue_status
 
165
        self.reviewer = reviewer
 
166
 
 
167
    def lp_save(self):
 
168
        pass
 
169
 
 
170
 
 
171
class TestGetEmail(TestCase):
 
172
 
 
173
    def test_get_email(self):
 
174
        self.assertEqual('jrandom@example.org', get_email(FakePerson()))
 
175
 
 
176
    def test_get_email_none(self):
 
177
        self.assertIs(None, get_email(FakePerson(email=None)))
 
178
 
 
179
 
 
180
class TestPQMRegexAcceptance(unittest.TestCase):
 
181
    """Tests if the generated commit message is accepted by PQM regexes."""
 
182
 
 
183
    def setUp(self):
 
184
        # PQM regexes; might need update once in a while
 
185
        self.devel_open_re = ("(?is)^\s*(:?\[testfix\])?\[(?:"
 
186
            "release-critical=[^\]]+|rs?=[^\]]+)\]")
 
187
        self.dbdevel_normal_re = ("(?is)^\s*(:?\[testfix\])?\[(?:"
 
188
            "release-critical|rs?=[^\]]+)\]")
 
189
 
 
190
        self.mp = MergeProposal(FakeLPMergeProposal())
 
191
        self.fake_bug = FakeBug(20)
 
192
        self.fake_person = FakePerson('foo', [])
 
193
        self.mp.get_bugs = FakeMethod([self.fake_bug])
 
194
        self.mp.get_reviews = FakeMethod({None : [self.fake_person]})
 
195
 
 
196
    def assertRegexpMatches(self, text, expected_regexp, msg=None):
 
197
        """Fail the test unless the text matches the regular expression.
 
198
 
 
199
        Method default in Python 2.7. Can be removed as soon as LP goes 2.7.
 
200
        """
 
201
        if isinstance(expected_regexp, basestring):
 
202
            expected_regexp = re.compile(expected_regexp)
 
203
        if not expected_regexp.search(text):
 
204
            msg = msg or "Regexp didn't match"
 
205
            msg = '%s: %r not found in %r' % (msg, expected_regexp.pattern,
 
206
                text)
 
207
            raise self.failureException(msg)
 
208
 
 
209
    def _test_commit_message_match(self, incr, no_qa, testfix):
 
210
        commit_message = self.mp.get_commit_message("Foobaring the sbrubble.",
 
211
            testfix, no_qa, incr)
 
212
        self.assertRegexpMatches(commit_message, self.devel_open_re)
 
213
        self.assertRegexpMatches(commit_message, self.dbdevel_normal_re)
 
214
 
 
215
    def test_testfix_match(self):
 
216
        self._test_commit_message_match(incr=False, no_qa=False, testfix=True)
 
217
 
 
218
    def test_regular_match(self):
 
219
        self._test_commit_message_match(incr=False, no_qa=False, testfix=False)
 
220
 
 
221
    def test_noqa_match(self):
 
222
        self._test_commit_message_match(incr=False, no_qa=True, testfix=False)
 
223
 
 
224
    def test_incr_match(self):
 
225
        self._test_commit_message_match(incr=True, no_qa=False, testfix=False)
101
226
 
102
227
 
103
228
class TestBugsClaused(unittest.TestCase):
121
246
        bugs_clause = get_bugs_clause([bug1, bug2])
122
247
        self.assertEqual('[bug=20,45]', bugs_clause)
123
248
 
 
249
    def test_fixed_bugs_are_excluded(self):
 
250
        # If a bug is fixed then it is excluded from the bugs clause.
 
251
        bug1 = FakeBug(20)
 
252
        bug2 = FakeBug(45, bug_tasks=[
 
253
            FakeBugTask('fake-project', 'Fix Released')])
 
254
        bug3 = FakeBug(67, bug_tasks=[
 
255
            FakeBugTask('fake-project', 'Fix Committed')])
 
256
        bugs_clause = get_bugs_clause([bug1, bug2, bug3])
 
257
        self.assertEqual('[bug=20]', bugs_clause)
 
258
 
 
259
    def test_bugs_open_on_launchpad_are_included(self):
 
260
        # If a bug has been fixed on one target but not in launchpad, then it
 
261
        # is included in the bugs clause, because it's relevant to launchpad
 
262
        # QA.
 
263
        bug = FakeBug(20, bug_tasks=[
 
264
            FakeBugTask('fake-project', 'Fix Released'),
 
265
            FakeBugTask('launchpad', 'Triaged')])
 
266
        bugs_clause = get_bugs_clause([bug])
 
267
        self.assertEqual('[bug=20]', bugs_clause)
 
268
 
 
269
    def test_bugs_fixed_on_launchpad_but_open_in_others_are_excluded(self):
 
270
        # If a bug has been fixed in Launchpad but not fixed on a different
 
271
        # target, then it is excluded from the bugs clause, since we don't
 
272
        # want to QA it.
 
273
        bug = FakeBug(20, bug_tasks=[
 
274
            FakeBugTask('fake-project', 'Triaged'),
 
275
            FakeBugTask('launchpad', 'Fix Released')])
 
276
        bugs_clause = get_bugs_clause([bug])
 
277
        self.assertEqual('', bugs_clause)
 
278
 
124
279
 
125
280
class TestGetTestfixClause(unittest.TestCase):
126
281
    """Tests for `get_testfix_clause`"""
193
348
 
194
349
    def test_rollback_and_noqa_and_incr_given(self):
195
350
        bugs = None
196
 
        no_qa = True
197
 
        incr = True
198
 
        self.assertEqual('[rollback=123]',
199
 
            get_qa_clause(bugs, rollback=123))
 
351
        self.assertEqual('[rollback=123]', get_qa_clause(bugs, rollback=123))
200
352
 
201
353
 
202
354
class TestGetReviewerHandle(unittest.TestCase):
401
553
        self.assertRaises(MissingReviewError, self.get_reviewer_clause, {})
402
554
 
403
555
 
 
556
class TestLaunchpadBranchLander(TestCase):
 
557
 
 
558
    def get_lander(self, landing_targets=None):
 
559
        branch = FakeBranch('public')
 
560
        if landing_targets is not None:
 
561
            branch.landing_targets = landing_targets
 
562
        launchpad = FakeLaunchpad([branch])
 
563
        return LaunchpadBranchLander(launchpad)
 
564
 
 
565
    def test_get_merge_proposal_from_branch_no_proposals(self):
 
566
        lander = self.get_lander()
 
567
        branch = FakeBzrBranch()
 
568
        e = self.assertRaises(errors.BzrCommandError,
 
569
                              lander.get_merge_proposal_from_branch, branch)
 
570
        self.assertEqual('The public branch has no active source merge'
 
571
                         ' proposals.  You must have a merge proposal before'
 
572
                         ' attempting to land the branch.', str(e))
 
573
 
 
574
    def test_get_merge_proposal_one_proposal(self):
 
575
        proposal = FakeLPMergeProposal()
 
576
        lander = self.get_lander([proposal])
 
577
        branch = FakeBzrBranch()
 
578
        lander_proposal = lander.get_merge_proposal_from_branch(branch)
 
579
        self.assertIs(proposal, lander_proposal._mp)
 
580
 
 
581
    def test_get_merge_proposal_two_proposal(self):
 
582
        lander = self.get_lander([FakeLPMergeProposal(),
 
583
                                  FakeLPMergeProposal()])
 
584
        branch = FakeBzrBranch()
 
585
        e = self.assertRaises(errors.BzrCommandError,
 
586
                              lander.get_merge_proposal_from_branch, branch)
 
587
        self.assertEqual('The public branch has multiple active source merge'
 
588
                         ' proposals.  You must provide the URL to the one'
 
589
                         ' you wish to use.', str(e))
 
590
 
 
591
    def test_get_merge_proposal_inactive(self):
 
592
        for status in ['Rejected', 'Work in progress', 'Merged', 'Queued',
 
593
                       'Code failed to merge', 'Superseded']:
 
594
            proposal = FakeLPMergeProposal(queue_status=status)
 
595
            lander = self.get_lander([proposal])
 
596
            branch = FakeBzrBranch()
 
597
            e = self.assertRaises(errors.BzrCommandError,
 
598
                                  lander.get_merge_proposal_from_branch,
 
599
                                  branch)
 
600
            self.assertEqual('The public branch has no active source merge'
 
601
                             ' proposals.  You must have a merge proposal'
 
602
                             ' before attempting to land the branch.',
 
603
                             str(e))
 
604
 
 
605
    def test_get_merge_proposal_active(self):
 
606
        branch = FakeBzrBranch()
 
607
        for status in ['Approved', 'Needs review']:
 
608
            proposal = FakeLPMergeProposal(queue_status=status)
 
609
            lander = self.get_lander([proposal])
 
610
            lander_proposal = lander.get_merge_proposal_from_branch(branch)
 
611
            self.assertIs(proposal, lander_proposal._mp)
 
612
 
 
613
 
 
614
class TestMergeProposal(TestCase):
 
615
 
 
616
    def test_get_reviews_none_unapproved(self):
 
617
        """Reviewer is not considered for un-approved propoals."""
 
618
        reviewer = FakePerson()
 
619
        proposal = FakeLPMergeProposal(queue_status='Needs review', votes=[],
 
620
                                       reviewer=reviewer)
 
621
        mp = MergeProposal(proposal)
 
622
        approvals = mp.get_reviews()
 
623
        self.assertEqual({}, approvals)
 
624
 
 
625
    def test_get_reviews_none_approved(self):
 
626
        """Approving a proposal counts as a vote if other approvals."""
 
627
        reviewer = FakePerson()
 
628
        proposal = FakeLPMergeProposal(queue_status='Approved', votes=[],
 
629
                                       reviewer=reviewer)
 
630
        mp = MergeProposal(proposal)
 
631
        approvals = mp.get_reviews()
 
632
        self.assertEqual({None: [reviewer]}, approvals)
 
633
 
 
634
    def test_get_reviews_one_approved(self):
 
635
        """As long as there is one approve vote, don't use reviewer."""
 
636
        reviewer = FakePerson()
 
637
        proposal = FakeLPMergeProposal(queue_status='Approved',
 
638
                                       votes=[FakeVote()],
 
639
                                       reviewer=reviewer)
 
640
        mp = MergeProposal(proposal)
 
641
        approvals = mp.get_reviews()
 
642
        self.assertEqual({None: [proposal.votes[0].reviewer]}, approvals)
 
643
 
 
644
    def test_get_stakeholder_emails(self):
 
645
        lp = FakeLaunchpad()
 
646
        mp = MergeProposal(FakeLPMergeProposal(root=lp))
 
647
        lp.me.preferred_email_address = FakeEmailAddress('lander@example.org')
 
648
        owner = mp._mp.source_branch.owner
 
649
        owner.preferred_email_address = FakeEmailAddress('owner@example.org')
 
650
        expected = set(['owner@example.org', 'lander@example.org'])
 
651
        emails = mp.get_stakeholder_emails()
 
652
        self.assertEqual(expected, emails)
 
653
 
 
654
    def test_get_stakeholder_emails_none(self):
 
655
        lp = FakeLaunchpad()
 
656
        mp = MergeProposal(FakeLPMergeProposal(root=lp))
 
657
        lp.me.preferred_email_address = FakeEmailAddress('lander@example.org')
 
658
        owner = mp._mp.source_branch.owner
 
659
        owner.preferred_email_address = None
 
660
        expected = set(['lander@example.org'])
 
661
        emails = mp.get_stakeholder_emails()
 
662
        self.assertEqual(expected, emails)
 
663
 
 
664
 
404
665
class TestSubmitter(TestCaseWithTransport):
405
666
 
406
667
    def make_submitter(self):