1
# Copyright (C) 2012 by the Free Software Foundation, Inc.
3
# This file is part of GNU Mailman.
5
# GNU Mailman is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free
7
# Software Foundation, either version 3 of the License, or (at your option)
10
# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
11
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
15
# You should have received a copy of the GNU General Public License along with
16
# GNU Mailman. If not, see <http://www.gnu.org/licenses/>.
18
"""Test the prototype archiver."""
20
from __future__ import absolute_import, print_function, unicode_literals
24
'TestPrototypeArchiver',
34
from email import message_from_file
35
from flufl.lock import Lock
37
from mailman.app.lifecycle import create_list
38
from mailman.archiving.prototype import Prototype
39
from mailman.config import config
40
from mailman.database.transaction import transaction
41
from mailman.testing.helpers import LogFileMark
42
from mailman.testing.helpers import (
43
specialized_message_from_string as mfs)
44
from mailman.testing.layers import ConfigLayer
45
from mailman.utilities.email import add_message_hash
48
class TestPrototypeArchiver(unittest.TestCase):
49
"""Test the prototype archiver."""
54
# Create a fake mailing list and message object
57
From: anne@example.com
58
Subject: Testing the test list
60
X-Message-ID-Hash: MS6QLWERIJLGCRF44J7USBFDELMNT2BW
62
Tests are better than no tests
63
but the water deserves to be swum.
66
self._mlist = create_list('test@example.com')
67
# Set up a temporary directory for the prototype archiver so that it's
69
self._tempdir = tempfile.mkdtemp()
70
config.push('prototype', """
73
""".format(self._tempdir))
74
# Capture the structure of a maildir.
75
self._expected_dir_structure = set(
76
(os.path.join(config.ARCHIVE_DIR, path) for path in (
78
os.path.join('prototype', self._mlist.fqdn_listname),
79
os.path.join('prototype', self._mlist.fqdn_listname, 'cur'),
80
os.path.join('prototype', self._mlist.fqdn_listname, 'new'),
81
os.path.join('prototype', self._mlist.fqdn_listname, 'tmp'),
83
self._expected_dir_structure.add(config.ARCHIVE_DIR)
86
shutil.rmtree(self._tempdir)
87
config.pop('prototype')
89
def _find(self, path):
91
for dirpath, dirnames, filenames in os.walk(path):
92
if not isinstance(dirpath, unicode):
93
dirpath = unicode(dirpath)
94
all_filenames.add(dirpath)
95
for filename in filenames:
96
new_filename = filename
97
if not isinstance(filename, unicode):
98
new_filename = unicode(filename)
99
all_filenames.add(os.path.join(dirpath, new_filename))
102
def test_archive_maildir_created(self):
103
# Archiving a message to the prototype archiver should create the
104
# expected directory structure.
105
Prototype.archive_message(self._mlist, self._msg)
106
all_filenames = self._find(config.ARCHIVE_DIR)
107
# Check that the directory structure has been created and we have one
108
# more file (the archived message) than expected directories.
109
archived_messages = all_filenames - self._expected_dir_structure
110
self.assertEqual(len(archived_messages), 1)
112
archived_messages.pop().startswith(
113
os.path.join(config.ARCHIVE_DIR, 'prototype',
114
self._mlist.fqdn_listname, 'new')))
116
def test_archive_maildir_existence_does_not_raise(self):
117
# Archiving a second message does not cause an EEXIST to be raised
118
# when a second message is archived.
120
Prototype.archive_message(self._mlist, self._msg)
121
for directory in ('cur', 'new', 'tmp'):
122
path = os.path.join(config.ARCHIVE_DIR, 'prototype',
123
self._mlist.fqdn_listname, directory)
124
if directory == 'new':
126
self.assertTrue(os.path.isdir(path))
127
# There should be one message in the 'new' directory.
128
self.assertEqual(len(os.listdir(new_dir)), 1)
129
# Archive a second message. If an exception occurs, let it fail the
130
# test. Afterward, two messages should be in the 'new' directory.
131
del self._msg['message-id']
132
del self._msg['x-message-id-hash']
133
self._msg['Message-ID'] = '<bee>'
134
add_message_hash(self._msg)
135
Prototype.archive_message(self._mlist, self._msg)
136
self.assertEqual(len(os.listdir(new_dir)), 2)
138
def test_archive_lock_used(self):
139
# Test that locking the maildir when adding works as a failure here
140
# could mean we lose mail.
141
lock_file = os.path.join(
142
config.LOCK_DIR, '{0}-maildir.lock'.format(
143
self._mlist.fqdn_listname))
144
with Lock(lock_file):
145
# Acquire the archiver lock, then make sure the archiver logs the
146
# fact that it could not acquire the lock.
147
archive_thread = threading.Thread(
148
target=Prototype.archive_message,
149
args=(self._mlist, self._msg))
150
mark = LogFileMark('mailman.error')
152
# Test that the archiver output the correct error.
153
line = mark.readline()
154
# XXX 2012-03-15 BAW: we really should remove timestamp prefixes
155
# from the loggers when under test.
156
self.assertTrue(line.endswith(
157
'Unable to acquire prototype archiver lock for {0}, '
158
'discarding: {1}\n'.format(
159
self._mlist.fqdn_listname,
160
self._msg.get('message-id'))))
161
# Check that the message didn't get archived.
162
created_files = self._find(config.ARCHIVE_DIR)
163
self.assertEqual(self._expected_dir_structure, created_files)
165
def test_prototype_archiver_good_path(self):
166
# Verify the good path; the message gets archived.
167
Prototype.archive_message(self._mlist, self._msg)
168
new_path = os.path.join(
169
config.ARCHIVE_DIR, 'prototype', self._mlist.fqdn_listname, 'new')
170
archived_messages = list(os.listdir(new_path))
171
self.assertEqual(len(archived_messages), 1)
172
# Check that the email has been added.
173
with open(os.path.join(new_path, archived_messages[0])) as fp:
174
archived_message = message_from_file(fp)
175
self.assertEqual(self._msg.as_string(), archived_message.as_string())