40
41
from mailman.interfaces.mailinglist import IAcceptableAliasSet
41
42
from mailman.interfaces.nntp import NewsgroupModeration
42
43
from mailman.interfaces.subscriptions import ISubscriptionService
44
from mailman.model.bans import Ban
43
45
from mailman.testing.helpers import temporary_db
44
46
from mailman.testing.layers import ConfigLayer
48
50
class MigrationTestBase(unittest.TestCase):
49
"""Test the dated migration (LP: #971013)
54
* news_moderation -> newsgroup_moderation
55
* news_prefix_subject_too -> nntp_prefix_subject_too
56
* include_list_post_header -> allow_list_posts
60
* REMOVE archive_private
61
* REMOVE archive_volume_frequency
65
* mailing_list -> list_id
51
"""Test database migrations."""
68
53
layer = ConfigLayer
73
58
def tearDown(self):
74
59
self._database._cleanup()
61
def _missing_present(self, table, migrations, missing, present):
62
"""The appropriate migrations leave columns missing and present.
64
:param table: The table to test columns from.
65
:param migrations: Sequence of migrations to load.
66
:param missing: Set of columns which should be missing after the
67
migrations are loaded.
68
:param present: Set of columns which should be present after the
69
migrations are loaded.
71
for migration in migrations:
72
self._database.load_migrations(migration)
73
self._database.store.commit()
74
for column in missing:
75
self.assertRaises(DatabaseError,
76
self._database.store.execute,
77
'select {0} from {1};'.format(column, table))
78
self._database.store.rollback()
79
for column in present:
80
# This should not produce an exception. Is there some better test
81
# that we can perform?
82
self._database.store.execute(
83
'select {0} from {1};'.format(column, table))
78
87
class TestMigration20120407Schema(MigrationTestBase):
81
90
def test_pre_upgrade_columns_migration(self):
82
91
# Test that before the migration, the old table columns are present
83
92
# and the new database columns are not.
85
# Load all the migrations to just before the one we're testing.
86
self._database.load_migrations('20120406999999')
87
self._database.store.commit()
88
# Verify that the database has not yet been migrated.
89
for missing in ('allow_list_posts',
92
'nntp_prefix_subject_too'):
93
self.assertRaises(DatabaseError,
94
self._database.store.execute,
95
'select {0} from mailinglist;'.format(missing))
96
self._database.store.rollback()
97
self.assertRaises(DatabaseError,
98
self._database.store.execute,
99
'select list_id from member;')
100
self._database.store.rollback()
101
for present in ('archive',
103
'archive_volume_frequency',
104
'generic_nonmember_action',
105
'include_list_post_header',
107
'news_prefix_subject_too',
109
# This should not produce an exception. Is there some better test
110
# that we can perform?
111
self._database.store.execute(
112
'select {0} from mailinglist;'.format(present))
113
# Again, this should not produce an exception.
114
self._database.store.execute('select mailing_list from member;')
93
self._missing_present('mailinglist',
95
# New columns are missing.
99
'nntp_prefix_subject_too'),
100
# Old columns are present.
103
'archive_volume_frequency',
104
'generic_nonmember_action',
105
'include_list_post_header',
107
'news_prefix_subject_too',
109
self._missing_present('member',
116
114
def test_post_upgrade_columns_migration(self):
117
115
# Test that after the migration, the old table columns are missing
118
116
# and the new database columns are present.
120
# Load all the migrations up to and including the one we're testing.
121
self._database.load_migrations('20120406999999')
122
self._database.load_migrations('20120407000000')
123
# Verify that the database has been migrated.
124
for present in ('allow_list_posts',
127
'nntp_prefix_subject_too'):
128
# This should not produce an exception. Is there some better test
129
# that we can perform?
130
self._database.store.execute(
131
'select {0} from mailinglist;'.format(present))
132
self._database.store.execute('select list_id from member;')
133
for missing in ('archive',
135
'archive_volume_frequency',
136
'generic_nonmember_action',
137
'include_list_post_header',
139
'news_prefix_subject_too',
141
self.assertRaises(DatabaseError,
142
self._database.store.execute,
143
'select {0} from mailinglist;'.format(missing))
144
self._database.store.rollback()
145
self.assertRaises(DatabaseError,
146
self._database.store.execute,
147
'select mailing_list from member;')
117
self._missing_present('mailinglist',
120
# The old columns are missing.
123
'archive_volume_frequency',
124
'generic_nonmember_action',
125
'include_list_post_header',
127
'news_prefix_subject_too',
129
# The new columns are present.
133
'nntp_prefix_subject_too'))
134
self._missing_present('member',
367
358
with temporary_db(self._database):
368
359
mlist = getUtility(IListManager).get('test@example.com')
369
360
self.assertTrue(mlist.allow_list_posts)
364
class TestMigration20121015Schema(MigrationTestBase):
365
"""Test column migrations."""
367
def test_pre_upgrade_column_migrations(self):
368
self._missing_present('ban',
373
def test_post_upgrade_column_migrations(self):
374
self._missing_present('ban',
381
class TestMigration20121015MigratedData(MigrationTestBase):
382
"""Test non-migrated data."""
384
def test_migration_bans(self):
385
# Load all the migrations to just before the one we're testing.
386
self._database.load_migrations('20121014999999')
387
# Insert a list-specific ban.
388
self._database.store.execute("""
389
INSERT INTO ban VALUES (
390
1, 'anne@example.com', 'test@example.com');
392
# Insert a global ban.
393
self._database.store.execute("""
394
INSERT INTO ban VALUES (
395
2, 'bart@example.com', NULL);
397
# Update to the current migration we're testing.
398
self._database.load_migrations('20121015000000')
399
# Now both the local and global bans should still be present.
400
bans = sorted(self._database.store.find(Ban),
401
key=attrgetter('email'))
402
self.assertEqual(bans[0].email, 'anne@example.com')
403
self.assertEqual(bans[0].list_id, 'test.example.com')
404
self.assertEqual(bans[1].email, 'bart@example.com')
405
self.assertEqual(bans[1].list_id, None)