~jelmer/bzr-pqm/lp-land-supports-child_pqm_email

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
# Copyright (C) 2005-2008 by Canonical Ltd
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""Submit an email to a Patch Queue Manager"""

from bzrlib import (
    config as _mod_config,
    errors,
    gpg,
    osutils,
    urlutils,
    )
from bzrlib.branch import Branch
from bzrlib.email_message import EmailMessage
from bzrlib.smtp_connection import SMTPConnection
from bzrlib.trace import note, warning


class BadCommitMessage(errors.BzrError):

    _fmt = "The commit message %(msg)r cannot be used by pqm."

    def __init__(self, message):
        errors.BzrError.__init__(self)
        self.msg = message


class NoPQMSubmissionAddress(errors.BzrError):

    _fmt = "No PQM submission email address specified for %(branch_url)s."

    def __init__(self, branch):
        if (branch is None) or (branch.base is None):
            branch_url = '(none)'
        else:
            branch_url = urlutils.unescape_for_display(branch.base, 'ascii')
        errors.BzrError.__init__(self, branch_url=branch_url)


class PQMEmailMessage(EmailMessage):
    """PQM doesn't support proper email subjects, so we hack around it."""

    def __init__(self, from_address, to_address, subject, body=None):
        EmailMessage.__init__(self, from_address=from_address,
                              to_address=to_address, subject=subject,
                              body=body)
        # Now override self.Subject to use raw utf-8
        self._headers['Subject'] = osutils.safe_unicode(subject).encode('UTF-8')


class PQMSubmission(object):
    """A request to perform a PQM merge into a branch."""

    def __init__(self, source_branch, public_location=None,
                 submit_location=None, message=None,
                 tree=None):
        """Create a PQMSubmission object.

        :param source_branch: the source branch for the merge
        :param public_location: the public location of the source branch
        :param submit_location: the location of the target branch
        :param message: The message to use when committing this merge
        :param tree: A WorkingTree or None. If not None the WT will be checked
            for uncommitted changes.

        If any of public_location, submit_location or message are
        omitted, they will be calculated from source_branch.
        """
        if source_branch is None and public_location is None:
            raise errors.NoMergeSource()
        self.source_branch = source_branch
        self.tree = tree

        if public_location is None:
            public_location = self.source_branch.get_public_branch()
            # Fall back to the old public_repository hack.
            if public_location is None:
                src_loc = source_branch.bzrdir.root_transport.local_abspath('.')
                repository = source_branch.repository
                repo_loc = repository.bzrdir.root_transport.local_abspath('.')
                repo_config = _mod_config.LocationConfig(repo_loc)
                public_repo = repo_config.get_user_option("public_repository")
                if public_repo is not None:
                    warning("Please use public_branch, not public_repository, "
                            "to set the public location of branches.")
                    branch_relpath = osutils.relpath(repo_loc, src_loc)
                    public_location = urlutils.join(public_repo, branch_relpath)

            if public_location is None:
                raise errors.NoPublicBranch(self.source_branch)
        self.public_location = public_location

        if submit_location is None:
            if self.source_branch is None:
                raise errors.BzrError(
                    "Cannot determine submit location to use.")
            config = self.source_branch.get_config()
            # First check the deprecated pqm_branch config key:
            submit_location = config.get_user_option('pqm_branch')
            if submit_location is not None:
                warning("Please use submit_branch, not pqm_branch to set "
                        "the PQM merge target branch.")
            else:
                # Otherwise, use the standard config key:
                submit_location = self.source_branch.get_submit_branch()

            if submit_location is None:
                raise errors.NoSubmitBranch(self.source_branch)
            # See if the submit_location has a public branch
            try:
                submit_branch = Branch.open(submit_location)
            except errors.NotBranchError:
                pass
            else:
                submit_public_location = submit_branch.get_public_branch()
                if submit_public_location is not None:
                    submit_location = submit_public_location
        self.submit_location = submit_location

        # Check that the message is okay to pass to PQM
        assert message is not None
        self.message = message.encode('utf8')
        if '\n' in self.message:
            raise BadCommitMessage(self.message)

    def check_tree(self):
        """Check that the working tree has no uncommitted changes."""
        if self.tree is None:
            return
        note('Checking the working tree is clean ...')
        self.tree.lock_read()
        try:
            basis_tree = self.tree.basis_tree()
            basis_tree.lock_read()
            try:
                for change in self.tree.iter_changes(basis_tree):
                    # If we have any changes, the tree is not clean
                    raise errors.UncommittedChanges(self.tree)
            finally:
                basis_tree.unlock()
        finally:
            self.tree.unlock()

    def check_public_branch(self):
        """Check that the public branch is up to date with the local copy."""
        note('Checking that the public branch is up to date at\n    %s',
             urlutils.unescape_for_display(self.public_location, 'utf-8'))
        local_revision = self.source_branch.last_revision()
        public_revision = Branch.open(self.public_location).last_revision()
        if local_revision != public_revision:
            raise errors.PublicBranchOutOfDate(
                self.public_location, local_revision)

    def to_lines(self):
        """Serialise as a list of lines."""
        return ['star-merge %s %s\n' % (self.public_location, self.submit_location)]

    def to_signed(self):
        """Serialize as a signed string."""
        unsigned_text = ''.join(self.to_lines())
        unsigned_text = unsigned_text.encode('ascii') #URLs should be ascii

        if self.source_branch:
            config = self.source_branch.get_config()
        else:
            config = _mod_config.GlobalConfig()
        strategy = gpg.GPGStrategy(config)
        return strategy.sign(unsigned_text)

    def to_email(self, mail_from, mail_to, sign=True):
        """Serialize as an email message.

        :param mail_from: The from address for the message
        :param mail_to: The address to send the message to
        :param sign: If True, gpg-sign the email
        :return: an email message
        """
        if sign:
            body = self.to_signed()
        else:
            body = ''.join(self.to_lines())
        message = PQMEmailMessage(mail_from, mail_to, self.message, body)
        return message


class StackedConfig(_mod_config.Config):

    def __init__(self):
        super(StackedConfig, self).__init__()
        self._sources = []

    def add_source(self, source):
        self._sources.append(source)

    def _get_user_option(self, option_name):
        """See Config._get_user_option."""
        for source in self._sources:
            value = source._get_user_option(option_name)
            if value is not None:
                return value
        return None

    def _get_user_id(self):
        for source in self._sources:
            value = source._get_user_id()
            if value is not None:
                return value
        return None


def submit(branch, message, dry_run=False, public_location=None,
           submit_location=None, tree=None, ignore_local=False):
    """Submit the given branch to the pqm."""
    config = StackedConfig()
    if branch:
        config.add_source(branch.get_config())
    else:
        if public_location:
            config.add_source(_mod_config.LocationConfig(public_location))
        config.add_source(_mod_config.GlobalConfig())

    submission = PQMSubmission(
        source_branch=branch, public_location=public_location, message=message,
        submit_location=submit_location,
        tree=tree)

    mail_from = config.get_user_option('pqm_user_email')
    if not mail_from:
        mail_from = config.username()
    mail_from = mail_from.encode('utf8') # Make sure this isn't unicode
    mail_to = config.get_user_option('pqm_email')
    if not mail_to:
        raise NoPQMSubmissionAddress(branch)
    mail_to = mail_to.encode('utf8') # same here

    if not ignore_local:
        submission.check_tree()
        submission.check_public_branch()

    message = submission.to_email(mail_from, mail_to)

    if dry_run:
        print message.as_string()
        return

    SMTPConnection(config).send_email(message)