1
# Copyright (C) 2011-2015 by the Free Software Foundation, Inc.
3
# This file is part of GNU Mailman.
5
# GNU Mailman is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free
7
# Software Foundation, either version 3 of the License, or (at your option)
10
# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
11
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
15
# You should have received a copy of the GNU General Public License along with
16
# GNU Mailman. If not, see <http://www.gnu.org/licenses/>.
18
"""Moderation tests."""
27
from mailman.app.lifecycle import create_list
28
from mailman.app.moderator import handle_message, hold_message
29
from mailman.interfaces.action import Action
30
from mailman.interfaces.messages import IMessageStore
31
from mailman.interfaces.requests import IListRequests
32
from mailman.runners.incoming import IncomingRunner
33
from mailman.runners.outgoing import OutgoingRunner
34
from mailman.runners.pipeline import PipelineRunner
35
from mailman.testing.helpers import (
36
get_queue_messages, make_testable_runner, specialized_message_from_string)
37
from mailman.testing.layers import SMTPLayer
38
from mailman.utilities.datetime import now
39
from zope.component import getUtility
43
class TestModeration(unittest.TestCase):
44
"""Test moderation functionality."""
49
self._mlist = create_list('test@example.com')
50
self._request_db = IListRequests(self._mlist)
51
self._msg = specialized_message_from_string("""\
52
From: anne@example.com
58
self._in = make_testable_runner(IncomingRunner, 'in')
59
self._pipeline = make_testable_runner(PipelineRunner, 'pipeline')
60
self._out = make_testable_runner(OutgoingRunner, 'out')
62
def test_accepted_message_gets_posted(self):
63
# A message that is accepted by the moderator should get posted to the
64
# mailing list. LP: #827697
65
msgdata = dict(listname='test@example.com',
66
recipients=['bart@example.com'])
67
request_id = hold_message(self._mlist, self._msg, msgdata)
68
handle_message(self._mlist, request_id, Action.accept)
72
messages = list(SMTPLayer.smtpd.messages)
73
self.assertEqual(len(messages), 1)
75
# We don't need to test the entire posted message, just the bits that
76
# prove it got sent out.
77
self.assertTrue('x-mailman-version' in message)
78
self.assertTrue('x-peer' in message)
79
# The X-Mailman-Approved-At header has local timezone information in
80
# it, so test that separately.
81
self.assertEqual(message['x-mailman-approved-at'][:-5],
82
'Mon, 01 Aug 2005 07:49:23 ')
83
del message['x-mailman-approved-at']
84
# The Message-ID matches the original.
85
self.assertEqual(message['message-id'], '<alpha>')
86
# Anne sent the message and the mailing list received it.
87
self.assertEqual(message['from'], 'anne@example.com')
88
self.assertEqual(message['to'], 'test@example.com')
89
# The Subject header has the list's prefix.
90
self.assertEqual(message['subject'], '[Test] hold me')
91
# The list's -bounce address is the actual sender, and Bart is the
92
# only actual recipient. These headers are added by the testing
93
# framework and don't show up in production. They match the RFC 5321
95
self.assertEqual(message['x-mailfrom'], 'test-bounces@example.com')
96
self.assertEqual(message['x-rcptto'], 'bart@example.com')
98
def test_hold_action_alias_for_defer(self):
99
# In handle_message(), the 'hold' action is the same as 'defer' for
100
# purposes of this API.
101
request_id = hold_message(self._mlist, self._msg)
102
handle_message(self._mlist, request_id, Action.defer)
103
# The message is still in the pending requests.
104
key, data = self._request_db.get_request(request_id)
105
self.assertEqual(key, '<alpha>')
106
handle_message(self._mlist, request_id, Action.hold)
107
key, data = self._request_db.get_request(request_id)
108
self.assertEqual(key, '<alpha>')
110
def test_lp_1031391(self):
111
# LP: #1031391 msgdata['received_time'] gets added by the LMTP server.
112
# The value is a datetime. If this message gets held, it will break
113
# pending requests since they require string keys and values.
114
received_time = now()
115
msgdata = dict(received_time=received_time)
116
request_id = hold_message(self._mlist, self._msg, msgdata)
117
key, data = self._request_db.get_request(request_id)
118
self.assertEqual(data['received_time'], received_time)
120
def test_non_preserving_disposition(self):
121
# By default, disposed messages are not preserved.
122
request_id = hold_message(self._mlist, self._msg)
123
handle_message(self._mlist, request_id, Action.discard)
124
message_store = getUtility(IMessageStore)
125
self.assertIsNone(message_store.get_message_by_id('<alpha>'))
127
def test_preserving_disposition(self):
128
# Preserving a message keeps it in the store.
129
request_id = hold_message(self._mlist, self._msg)
130
handle_message(self._mlist, request_id, Action.discard, preserve=True)
131
message_store = getUtility(IMessageStore)
132
preserved_message = message_store.get_message_by_id('<alpha>')
133
self.assertEqual(preserved_message['message-id'], '<alpha>')
135
def test_preserve_and_forward(self):
136
# We can both preserve and forward the message.
137
request_id = hold_message(self._mlist, self._msg)
138
handle_message(self._mlist, request_id, Action.discard,
139
preserve=True, forward=['zack@example.com'])
140
# The message is preserved in the store.
141
message_store = getUtility(IMessageStore)
142
preserved_message = message_store.get_message_by_id('<alpha>')
143
self.assertEqual(preserved_message['message-id'], '<alpha>')
144
# And the forwarded message lives in the virgin queue.
145
messages = get_queue_messages('virgin')
146
self.assertEqual(len(messages), 1)
147
self.assertEqual(str(messages[0].msg['subject']),
148
'Forward of moderated message')
149
self.assertEqual(messages[0].msgdata['recipients'],
150
['zack@example.com'])