1
# Copyright (C) 2012-2014 by the Free Software Foundation, Inc.
3
# This file is part of GNU Mailman.
5
# GNU Mailman is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free
7
# Software Foundation, either version 3 of the License, or (at your option)
10
# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
11
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
15
# You should have received a copy of the GNU General Public License along with
16
# GNU Mailman. If not, see <http://www.gnu.org/licenses/>.
18
"""Test schema migrations."""
20
from __future__ import absolute_import, print_function, unicode_literals
24
'TestMigration20120407MigratedData',
25
'TestMigration20120407Schema',
26
'TestMigration20120407UnchangedData',
27
'TestMigration20121015MigratedData',
28
'TestMigration20121015Schema',
29
'TestMigration20130406MigratedData',
30
'TestMigration20130406Schema',
36
from datetime import datetime
37
from operator import attrgetter
38
from pkg_resources import resource_string
39
from sqlite3 import OperationalError
40
from zope.component import getUtility
42
from mailman.interfaces.database import IDatabaseFactory
43
from mailman.interfaces.domain import IDomainManager
44
from mailman.interfaces.archiver import ArchivePolicy
45
from mailman.interfaces.bounce import BounceContext
46
from mailman.interfaces.listmanager import IListManager
47
from mailman.interfaces.mailinglist import IAcceptableAliasSet
48
from mailman.interfaces.nntp import NewsgroupModeration
49
from mailman.interfaces.subscriptions import ISubscriptionService
50
from mailman.model.bans import Ban
51
from mailman.model.bounce import BounceEvent
52
from mailman.testing.helpers import temporary_db
53
from mailman.testing.layers import ConfigLayer
57
@unittest.skip('Migration tests are skipped')
58
class MigrationTestBase(unittest.TestCase):
59
"""Test database migrations."""
64
self._database = getUtility(IDatabaseFactory, 'temporary').create()
67
self._database._cleanup()
69
def _table_missing_present(self, migrations, missing, present):
70
"""The appropriate migrations leave some tables missing and present.
72
:param migrations: Sequence of migrations to load.
73
:param missing: Tables which should be missing.
74
:param present: Tables which should be present.
76
for migration in migrations:
77
self._database.load_migrations(migration)
78
self._database.store.commit()
80
self.assertRaises(OperationalError,
81
self._database.store.execute,
82
'select * from {};'.format(table))
84
self._database.store.execute('select * from {};'.format(table))
86
def _missing_present(self, table, migrations, missing, present):
87
"""The appropriate migrations leave columns missing and present.
89
:param table: The table to test columns from.
90
:param migrations: Sequence of migrations to load.
91
:param missing: Set of columns which should be missing after the
92
migrations are loaded.
93
:param present: Set of columns which should be present after the
94
migrations are loaded.
96
for migration in migrations:
97
self._database.load_migrations(migration)
98
self._database.store.commit()
99
for column in missing:
100
self.assertRaises(DatabaseError,
101
self._database.store.execute,
102
'select {0} from {1};'.format(column, table))
103
self._database.store.rollback()
104
for column in present:
105
# This should not produce an exception. Is there some better test
106
# that we can perform?
107
self._database.store.execute(
108
'select {0} from {1};'.format(column, table))
112
@unittest.skip('Migration tests are skipped')
113
class TestMigration20120407Schema(MigrationTestBase):
114
"""Test column migrations."""
116
def test_pre_upgrade_columns_migration(self):
117
# Test that before the migration, the old table columns are present
118
# and the new database columns are not.
119
self._missing_present('mailinglist',
121
# New columns are missing.
125
'nntp_prefix_subject_too'),
126
# Old columns are present.
129
'archive_volume_frequency',
130
'generic_nonmember_action',
131
'include_list_post_header',
133
'news_prefix_subject_too',
135
self._missing_present('member',
140
def test_post_upgrade_columns_migration(self):
141
# Test that after the migration, the old table columns are missing
142
# and the new database columns are present.
143
self._missing_present('mailinglist',
146
# The old columns are missing.
149
'archive_volume_frequency',
150
'generic_nonmember_action',
151
'include_list_post_header',
153
'news_prefix_subject_too',
155
# The new columns are present.
159
'nntp_prefix_subject_too'))
160
self._missing_present('member',
168
@unittest.skip('Migration tests are skipped')
169
class TestMigration20120407UnchangedData(MigrationTestBase):
170
"""Test non-migrated data."""
173
MigrationTestBase.setUp(self)
174
# Load all the migrations to just before the one we're testing.
175
self._database.load_migrations('20120406999999')
176
# Load the previous schema's sample data.
177
sample_data = resource_string(
178
'mailman.database.tests.data',
179
'migration_{0}_1.sql'.format(self._database.TAG))
180
self._database.load_sql(self._database.store, sample_data)
181
# XXX 2012-12-28: We have to load the last migration defined in the
182
# system, otherwise the ORM model will not match the SQL table
183
# definitions and we'll get OperationalErrors from SQLite.
184
self._database.load_migrations('20121015000000')
186
def test_migration_domains(self):
187
# Test that the domains table, which isn't touched, doesn't change.
188
with temporary_db(self._database):
189
# Check that the domains survived the migration. This table
190
# was not touched so it should be fine.
191
domains = list(getUtility(IDomainManager))
192
self.assertEqual(len(domains), 1)
193
self.assertEqual(domains[0].mail_host, 'example.com')
195
def test_migration_mailing_lists(self):
196
# Test that the mailing lists survive migration.
197
with temporary_db(self._database):
198
# There should be exactly one mailing list defined.
199
mlists = list(getUtility(IListManager).mailing_lists)
200
self.assertEqual(len(mlists), 1)
201
self.assertEqual(mlists[0].fqdn_listname, 'test@example.com')
203
def test_migration_acceptable_aliases(self):
204
# Test that the mailing list's acceptable aliases survive migration.
205
# This proves that foreign key references are migrated properly.
206
with temporary_db(self._database):
207
mlist = getUtility(IListManager).get('test@example.com')
208
aliases_set = IAcceptableAliasSet(mlist)
209
self.assertEqual(set(aliases_set.aliases),
210
set(['foo@example.com', 'bar@example.com']))
212
def test_migration_members(self):
213
# Test that the members of a mailing list all survive migration.
214
with temporary_db(self._database):
215
mlist = getUtility(IListManager).get('test@example.com')
216
# Test that all the members we expect are still there. Start with
217
# the two list delivery members.
218
addresses = set(address.email
219
for address in mlist.members.addresses)
220
self.assertEqual(addresses,
221
set(['anne@example.com', 'bart@example.com']))
222
# There is one owner.
223
owners = set(address.email for address in mlist.owners.addresses)
224
self.assertEqual(len(owners), 1)
225
self.assertEqual(owners.pop(), 'anne@example.com')
226
# There is one moderator.
227
moderators = set(address.email
228
for address in mlist.moderators.addresses)
229
self.assertEqual(len(moderators), 1)
230
self.assertEqual(moderators.pop(), 'bart@example.com')
234
@unittest.skip('Migration tests are skipped')
235
class TestMigration20120407MigratedData(MigrationTestBase):
236
"""Test affected migration data."""
239
MigrationTestBase.setUp(self)
240
# Load all the migrations to just before the one we're testing.
241
self._database.load_migrations('20120406999999')
242
# Load the previous schema's sample data.
243
sample_data = resource_string(
244
'mailman.database.tests.data',
245
'migration_{0}_1.sql'.format(self._database.TAG))
246
self._database.load_sql(self._database.store, sample_data)
249
# XXX 2012-12-28: We have to load the last migration defined in the
250
# system, otherwise the ORM model will not match the SQL table
251
# definitions and we'll get OperationalErrors from SQLite.
252
self._database.load_migrations('20121015000000')
254
def test_migration_archive_policy_never_0(self):
255
# Test that the new archive_policy value is updated correctly. In the
256
# case of old column archive=0, the archive_private column is
257
# ignored. This test sets it to 0 to ensure it's ignored.
258
self._database.store.execute(
259
'UPDATE mailinglist SET archive = {0}, archive_private = {0} '
260
'WHERE id = 1;'.format(self._database.FALSE))
261
# Complete the migration
263
with temporary_db(self._database):
264
mlist = getUtility(IListManager).get('test@example.com')
265
self.assertEqual(mlist.archive_policy, ArchivePolicy.never)
267
def test_migration_archive_policy_never_1(self):
268
# Test that the new archive_policy value is updated correctly. In the
269
# case of old column archive=0, the archive_private column is
270
# ignored. This test sets it to 1 to ensure it's ignored.
271
self._database.store.execute(
272
'UPDATE mailinglist SET archive = {0}, archive_private = {1} '
273
'WHERE id = 1;'.format(self._database.FALSE,
274
self._database.TRUE))
275
# Complete the migration
277
with temporary_db(self._database):
278
mlist = getUtility(IListManager).get('test@example.com')
279
self.assertEqual(mlist.archive_policy, ArchivePolicy.never)
281
def test_archive_policy_private(self):
282
# Test that the new archive_policy value is updated correctly for
284
self._database.store.execute(
285
'UPDATE mailinglist SET archive = {0}, archive_private = {0} '
286
'WHERE id = 1;'.format(self._database.TRUE))
287
# Complete the migration
289
with temporary_db(self._database):
290
mlist = getUtility(IListManager).get('test@example.com')
291
self.assertEqual(mlist.archive_policy, ArchivePolicy.private)
293
def test_archive_policy_public(self):
294
# Test that the new archive_policy value is updated correctly for
296
self._database.store.execute(
297
'UPDATE mailinglist SET archive = {1}, archive_private = {0} '
298
'WHERE id = 1;'.format(self._database.FALSE,
299
self._database.TRUE))
300
# Complete the migration
302
with temporary_db(self._database):
303
mlist = getUtility(IListManager).get('test@example.com')
304
self.assertEqual(mlist.archive_policy, ArchivePolicy.public)
306
def test_list_id(self):
307
# Test that the mailinglist table gets a list_id column.
309
with temporary_db(self._database):
310
mlist = getUtility(IListManager).get('test@example.com')
311
self.assertEqual(mlist.list_id, 'test.example.com')
313
def test_list_id_member(self):
314
# Test that the member table's mailing_list column becomes list_id.
316
with temporary_db(self._database):
317
service = getUtility(ISubscriptionService)
318
members = list(service.find_members(list_id='test.example.com'))
319
self.assertEqual(len(members), 4)
321
def test_news_moderation_none(self):
322
# Test that news_moderation becomes newsgroup_moderation.
323
self._database.store.execute(
324
'UPDATE mailinglist SET news_moderation = 0 '
327
with temporary_db(self._database):
328
mlist = getUtility(IListManager).get('test@example.com')
329
self.assertEqual(mlist.newsgroup_moderation,
330
NewsgroupModeration.none)
332
def test_news_moderation_open_moderated(self):
333
# Test that news_moderation becomes newsgroup_moderation.
334
self._database.store.execute(
335
'UPDATE mailinglist SET news_moderation = 1 '
338
with temporary_db(self._database):
339
mlist = getUtility(IListManager).get('test@example.com')
340
self.assertEqual(mlist.newsgroup_moderation,
341
NewsgroupModeration.open_moderated)
343
def test_news_moderation_moderated(self):
344
# Test that news_moderation becomes newsgroup_moderation.
345
self._database.store.execute(
346
'UPDATE mailinglist SET news_moderation = 2 '
349
with temporary_db(self._database):
350
mlist = getUtility(IListManager).get('test@example.com')
351
self.assertEqual(mlist.newsgroup_moderation,
352
NewsgroupModeration.moderated)
354
def test_nntp_prefix_subject_too_false(self):
355
# Test that news_prefix_subject_too becomes nntp_prefix_subject_too.
356
self._database.store.execute(
357
'UPDATE mailinglist SET news_prefix_subject_too = {0} '
358
'WHERE id = 1;'.format(self._database.FALSE))
360
with temporary_db(self._database):
361
mlist = getUtility(IListManager).get('test@example.com')
362
self.assertFalse(mlist.nntp_prefix_subject_too)
364
def test_nntp_prefix_subject_too_true(self):
365
# Test that news_prefix_subject_too becomes nntp_prefix_subject_too.
366
self._database.store.execute(
367
'UPDATE mailinglist SET news_prefix_subject_too = {0} '
368
'WHERE id = 1;'.format(self._database.TRUE))
370
with temporary_db(self._database):
371
mlist = getUtility(IListManager).get('test@example.com')
372
self.assertTrue(mlist.nntp_prefix_subject_too)
374
def test_allow_list_posts_false(self):
375
# Test that include_list_post_header -> allow_list_posts.
376
self._database.store.execute(
377
'UPDATE mailinglist SET include_list_post_header = {0} '
378
'WHERE id = 1;'.format(self._database.FALSE))
380
with temporary_db(self._database):
381
mlist = getUtility(IListManager).get('test@example.com')
382
self.assertFalse(mlist.allow_list_posts)
384
def test_allow_list_posts_true(self):
385
# Test that include_list_post_header -> allow_list_posts.
386
self._database.store.execute(
387
'UPDATE mailinglist SET include_list_post_header = {0} '
388
'WHERE id = 1;'.format(self._database.TRUE))
390
with temporary_db(self._database):
391
mlist = getUtility(IListManager).get('test@example.com')
392
self.assertTrue(mlist.allow_list_posts)
396
@unittest.skip('Migration tests are skipped')
397
class TestMigration20121015Schema(MigrationTestBase):
398
"""Test column migrations."""
400
def test_pre_upgrade_column_migrations(self):
401
self._missing_present('ban',
405
self._missing_present('mailinglist',
408
('new_member_options', 'send_reminders',
409
'subscribe_policy', 'unsubscribe_policy',
410
'subscribe_auto_approval', 'private_roster',
411
'admin_member_chunksize'),
414
def test_post_upgrade_column_migrations(self):
415
self._missing_present('ban',
420
self._missing_present('mailinglist',
423
('new_member_options', 'send_reminders',
424
'subscribe_policy', 'unsubscribe_policy',
425
'subscribe_auto_approval', 'private_roster',
426
'admin_member_chunksize'),
431
@unittest.skip('Migration tests are skipped')
432
class TestMigration20121015MigratedData(MigrationTestBase):
433
"""Test non-migrated data."""
435
def test_migration_bans(self):
436
# Load all the migrations to just before the one we're testing.
437
self._database.load_migrations('20121014999999')
438
# Insert a list-specific ban.
439
self._database.store.execute("""
440
INSERT INTO ban VALUES (
441
1, 'anne@example.com', 'test@example.com');
443
# Insert a global ban.
444
self._database.store.execute("""
445
INSERT INTO ban VALUES (
446
2, 'bart@example.com', NULL);
448
# Update to the current migration we're testing.
449
self._database.load_migrations('20121015000000')
450
# Now both the local and global bans should still be present.
451
bans = sorted(self._database.store.find(Ban),
452
key=attrgetter('email'))
453
self.assertEqual(bans[0].email, 'anne@example.com')
454
self.assertEqual(bans[0].list_id, 'test.example.com')
455
self.assertEqual(bans[1].email, 'bart@example.com')
456
self.assertEqual(bans[1].list_id, None)
460
@unittest.skip('Migration tests are skipped')
461
class TestMigration20130406Schema(MigrationTestBase):
462
"""Test column migrations."""
464
def test_pre_upgrade_column_migrations(self):
465
self._missing_present('bounceevent',
470
def test_post_upgrade_column_migrations(self):
471
self._missing_present('bounceevent',
477
def test_pre_listarchiver_table(self):
478
self._table_missing_present(['20130405999999'], ('listarchiver',), ())
480
def test_post_listarchiver_table(self):
481
self._table_missing_present(['20130405999999',
488
@unittest.skip('Migration tests are skipped')
489
class TestMigration20130406MigratedData(MigrationTestBase):
490
"""Test migrated data."""
492
def test_migration_bounceevent(self):
493
# Load all migrations to just before the one we're testing.
494
self._database.load_migrations('20130405999999')
495
# Insert a bounce event.
496
self._database.store.execute("""
497
INSERT INTO bounceevent VALUES (
498
1, 'test@example.com', 'anne@example.com',
499
'2013-04-06 21:12:00', '<abc@example.com>',
502
# Update to the current migration we're testing
503
self._database.load_migrations('20130406000000')
504
# The bounce event should exist, but with a list-id instead of a fqdn
506
events = list(self._database.store.find(BounceEvent))
507
self.assertEqual(len(events), 1)
508
self.assertEqual(events[0].list_id, 'test.example.com')
509
self.assertEqual(events[0].email, 'anne@example.com')
510
self.assertEqual(events[0].timestamp, datetime(2013, 4, 6, 21, 12))
511
self.assertEqual(events[0].message_id, '<abc@example.com>')
512
self.assertEqual(events[0].context, BounceContext.normal)
513
self.assertFalse(events[0].processed)