~muffinresearch/bzr-email/bzr-email-keychain

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# Copyright (C) 2005 by Canonical Ltd
#   Authors: Robert Collins <robert.collins@canonical.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA

from StringIO import StringIO
import subprocess

import bzrlib.config as config
import bzrlib.errors as errors


class EmailSender(object):
    """An email message sender."""

    def __init__(self, branch, revision_id, config):
        self.config = config
        self.branch = branch
        self.revision = self.branch.repository.get_revision(revision_id)
        self.revno = self.branch.revision_id_to_revno(revision_id)

    def body(self):
        from bzrlib.log import log_formatter, show_log

        rev1 = rev2 = self.revno
        if rev1 == 0:
            rev1 = None
            rev2 = None

        # use 'replace' so that we don't abort if trying to write out
        # in e.g. the default C locale.

        outf = StringIO()
        lf = log_formatter('long',
                           show_ids=True,
                           to_file=outf
                           )

        show_log(self.branch,
                 lf,
                 start_revision=rev1,
                 end_revision=rev2,
                 verbose=True
                 )

        if self.difflimit():
            self.add_diff(outf, self.difflimit())
        return outf.getvalue()

    def add_diff(self, outf, difflimit):
        """Add the diff from the commit to the output.

        If the diff has more than difflimit lines, it will be skipped.
        """
        from bzrlib.diff import show_diff_trees
        # optionally show the diff if its smaller than the post_commit_difflimit option
        revid1 = self.revision.revision_id
        if self.revision.parent_ids:
            revid2 = self.revision.parent_ids[0]
        else:
            revid2 = None
        tree1, tree2 = self.branch.repository.revision_trees((revid1, revid2))
        diff_content = StringIO()
        show_diff_trees(tree1, tree2, diff_content)
        lines = diff_content.getvalue().split("\n")
        numlines = len(lines)
        difflimit = self.difflimit()
        if difflimit:
            if numlines <= difflimit:
                outf.write(diff_content.getvalue())
            else:
                outf.write("\nDiff too large for email (%d, the limit is %d).\n"
                    % (numlines, difflimit))

    def difflimit(self):
        """maximum number of lines of diff to show."""
        result = self.config.get_user_option('post_commit_difflimit')
        if result is None:
            result = 1000
        return int(result)

    def mailer(self):
        """What mail program to use."""
        result = self.config.get_user_option('post_commit_mailer')
        if result is None:
            result = "mail"
        return result

    def _command_line(self):
        return [self.mailer(), '-s', self.subject(), '-a',
                "From: " + self.from_address(), self.to()]

    def to(self):
        """What is the address the mail should go to."""
        return self.config.get_user_option('post_commit_to')

    def url(self):
        """What URL to display in the subject of the mail"""
        url = self.config.get_user_option('post_commit_url')
        if url is None:
            url = self.branch.base
        return url
    

    def from_address(self):
        """What address should I send from."""
        result = self.config.get_user_option('post_commit_sender')
        if result is None:
            result = self.config.username()
        return result

    def send(self):
        # TODO think up a good test for this, but I think it needs
        # a custom binary shipped with. RBC 20051021
        try:
            process = subprocess.Popen(self._command_line(),
                                       stdin=subprocess.PIPE)
            try:
                result = process.communicate(self.body().encode('utf8'))[0]
                if process.returncode is None:
                    process.wait()
                if process.returncode != 0:
                    raise errors.BzrError("Failed to send email")
                return result
            except OSError, e:
                if e.errno == errno.EPIPE:
                    raise errors.BzrError("Failed to send email.")
                else:
                    raise
        except ValueError:
            # bad subprocess parameters, should never happen.
            raise
        except OSError, e:
            if e.errno == errno.ENOENT:
                raise errors.BzrError("mail is not installed !?")
            else:
                raise

    def should_send(self):
        return self.to() is not None and self.from_address() is not None

    def send_maybe(self):
        if self.should_send():
            self.send()

    def subject(self):
        return ("Rev %d: %s in %s" % 
                (self.revno,
                 self.revision.get_summary(),
                 self.url()))


def post_commit(branch, revision_id):
    EmailSender(branch, revision_id, config.BranchConfig(branch)).send_maybe()


def test_suite():
    from unittest import TestSuite
    import bzrlib.plugins.email.tests 
    result = TestSuite()
    result.addTest(bzrlib.plugins.email.tests.test_suite())
    return result