~barry/mailman/events-and-web

« back to all changes in this revision

Viewing changes to src/mailman/archiving/tests/test_prototype.py

  • Committer: klm
  • Date: 1998-01-07 21:21:35 UTC
  • Revision ID: vcs-imports@canonical.com-19980107212135-sv0y521ps0xye37r
Initial revision

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2012 by the Free Software Foundation, Inc.
2
 
#
3
 
# This file is part of GNU Mailman.
4
 
#
5
 
# GNU Mailman is free software: you can redistribute it and/or modify it under
6
 
# the terms of the GNU General Public License as published by the Free
7
 
# Software Foundation, either version 3 of the License, or (at your option)
8
 
# any later version.
9
 
#
10
 
# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
11
 
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12
 
# FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
13
 
# more details.
14
 
#
15
 
# You should have received a copy of the GNU General Public License along with
16
 
# GNU Mailman.  If not, see <http://www.gnu.org/licenses/>.
17
 
 
18
 
"""Test the prototype archiver."""
19
 
 
20
 
from __future__ import absolute_import, print_function, unicode_literals
21
 
 
22
 
__metaclass__ = type
23
 
__all__ = [
24
 
    'TestPrototypeArchiver',
25
 
    ]
26
 
 
27
 
 
28
 
import os
29
 
import shutil
30
 
import tempfile
31
 
import unittest
32
 
import threading
33
 
 
34
 
from email import message_from_file
35
 
from flufl.lock import Lock
36
 
 
37
 
from mailman.app.lifecycle import create_list
38
 
from mailman.archiving.prototype import Prototype
39
 
from mailman.config import config
40
 
from mailman.database.transaction import transaction
41
 
from mailman.testing.helpers import LogFileMark
42
 
from mailman.testing.helpers import (
43
 
    specialized_message_from_string as mfs)
44
 
from mailman.testing.layers import ConfigLayer
45
 
from mailman.utilities.email import add_message_hash
46
 
 
47
 
 
48
 
class TestPrototypeArchiver(unittest.TestCase):
49
 
    """Test the prototype archiver."""
50
 
 
51
 
    layer = ConfigLayer
52
 
 
53
 
    def setUp(self):
54
 
        # Create a fake mailing list and message object
55
 
        self._msg = mfs("""\
56
 
To: test@example.com
57
 
From: anne@example.com
58
 
Subject: Testing the test list
59
 
Message-ID: <ant>
60
 
X-Message-ID-Hash: MS6QLWERIJLGCRF44J7USBFDELMNT2BW
61
 
 
62
 
Tests are better than no tests
63
 
but the water deserves to be swum.
64
 
""")
65
 
        with transaction():
66
 
            self._mlist = create_list('test@example.com')
67
 
        # Set up a temporary directory for the prototype archiver so that it's
68
 
        # easier to clean up.
69
 
        self._tempdir = tempfile.mkdtemp()
70
 
        config.push('prototype', """
71
 
        [paths.testing]
72
 
        archive_dir: {0}
73
 
        """.format(self._tempdir))
74
 
        # Capture the structure of a maildir.
75
 
        self._expected_dir_structure = set(
76
 
            (os.path.join(config.ARCHIVE_DIR, path) for path in (
77
 
                'prototype',
78
 
                os.path.join('prototype', self._mlist.fqdn_listname),
79
 
                os.path.join('prototype', self._mlist.fqdn_listname, 'cur'),
80
 
                os.path.join('prototype', self._mlist.fqdn_listname, 'new'),
81
 
                os.path.join('prototype', self._mlist.fqdn_listname, 'tmp'),
82
 
                )))
83
 
        self._expected_dir_structure.add(config.ARCHIVE_DIR)
84
 
 
85
 
    def tearDown(self):
86
 
        shutil.rmtree(self._tempdir)
87
 
        config.pop('prototype')
88
 
 
89
 
    def _find(self, path):
90
 
        all_filenames = set()
91
 
        for dirpath, dirnames, filenames in os.walk(path):
92
 
            if not isinstance(dirpath, unicode):
93
 
                dirpath = unicode(dirpath)
94
 
            all_filenames.add(dirpath)
95
 
            for filename in filenames:
96
 
                new_filename = filename
97
 
                if not isinstance(filename, unicode):
98
 
                    new_filename = unicode(filename)
99
 
                all_filenames.add(os.path.join(dirpath, new_filename))
100
 
        return all_filenames
101
 
 
102
 
    def test_archive_maildir_created(self):
103
 
        # Archiving a message to the prototype archiver should create the
104
 
        # expected directory structure.
105
 
        Prototype.archive_message(self._mlist, self._msg)
106
 
        all_filenames = self._find(config.ARCHIVE_DIR)
107
 
        # Check that the directory structure has been created and we have one
108
 
        # more file (the archived message) than expected directories.
109
 
        archived_messages = all_filenames - self._expected_dir_structure
110
 
        self.assertEqual(len(archived_messages), 1)
111
 
        self.assertTrue(
112
 
            archived_messages.pop().startswith(
113
 
                os.path.join(config.ARCHIVE_DIR, 'prototype',
114
 
                             self._mlist.fqdn_listname, 'new')))
115
 
 
116
 
    def test_archive_maildir_existence_does_not_raise(self):
117
 
        # Archiving a second message does not cause an EEXIST to be raised
118
 
        # when a second message is archived.
119
 
        new_dir = None
120
 
        Prototype.archive_message(self._mlist, self._msg)
121
 
        for directory in ('cur', 'new', 'tmp'):
122
 
            path = os.path.join(config.ARCHIVE_DIR, 'prototype',
123
 
                                self._mlist.fqdn_listname, directory)
124
 
            if directory == 'new':
125
 
                new_dir = path
126
 
            self.assertTrue(os.path.isdir(path))
127
 
        # There should be one message in the 'new' directory.
128
 
        self.assertEqual(len(os.listdir(new_dir)), 1)
129
 
        # Archive a second message.  If an exception occurs, let it fail the
130
 
        # test.  Afterward, two messages should be in the 'new' directory.
131
 
        del self._msg['message-id']
132
 
        del self._msg['x-message-id-hash']
133
 
        self._msg['Message-ID'] = '<bee>'
134
 
        add_message_hash(self._msg)
135
 
        Prototype.archive_message(self._mlist, self._msg)
136
 
        self.assertEqual(len(os.listdir(new_dir)), 2)
137
 
 
138
 
    def test_archive_lock_used(self):
139
 
        # Test that locking the maildir when adding works as a failure here
140
 
        # could mean we lose mail.
141
 
        lock_file = os.path.join(
142
 
            config.LOCK_DIR, '{0}-maildir.lock'.format(
143
 
                self._mlist.fqdn_listname))
144
 
        with Lock(lock_file):
145
 
            # Acquire the archiver lock, then make sure the archiver logs the
146
 
            # fact that it could not acquire the lock.
147
 
            archive_thread = threading.Thread(
148
 
                target=Prototype.archive_message,
149
 
                args=(self._mlist, self._msg))
150
 
            mark = LogFileMark('mailman.error')
151
 
            archive_thread.run()
152
 
            # Test that the archiver output the correct error.
153
 
            line = mark.readline()
154
 
            # XXX 2012-03-15 BAW: we really should remove timestamp prefixes
155
 
            # from the loggers when under test.
156
 
            self.assertTrue(line.endswith(
157
 
                'Unable to acquire prototype archiver lock for {0}, '
158
 
                'discarding: {1}\n'.format(
159
 
                    self._mlist.fqdn_listname,
160
 
                    self._msg.get('message-id'))))
161
 
        # Check that the message didn't get archived.
162
 
        created_files = self._find(config.ARCHIVE_DIR)
163
 
        self.assertEqual(self._expected_dir_structure, created_files)
164
 
 
165
 
    def test_prototype_archiver_good_path(self):
166
 
        # Verify the good path; the message gets archived.
167
 
        Prototype.archive_message(self._mlist, self._msg)
168
 
        new_path = os.path.join(
169
 
            config.ARCHIVE_DIR, 'prototype', self._mlist.fqdn_listname, 'new')
170
 
        archived_messages = list(os.listdir(new_path))
171
 
        self.assertEqual(len(archived_messages), 1)
172
 
        # Check that the email has been added.
173
 
        with open(os.path.join(new_path, archived_messages[0])) as fp:
174
 
            archived_message = message_from_file(fp)
175
 
        self.assertEqual(self._msg.as_string(), archived_message.as_string())