~canonical-livepatch-dependencies/canonical-livepatch-service-dependencies/alembic

« back to all changes in this revision

Viewing changes to tests/test_version_table.py

  • Committer: Free Ekanayaka
  • Author(s): Ondřej Nový
  • Date: 2016-04-21 20:04:57 UTC
  • Revision ID: free.ekanayaka@canonical.com-20160421200457-b4x0zmvn1nhc094p
Tags: upstream-0.8.6
Import upstream version 0.8.6

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
from alembic.testing.fixtures import TestBase
 
2
from alembic.testing import mock
 
3
from alembic.testing import config, eq_, assert_raises, assert_raises_message
 
4
 
 
5
from sqlalchemy import Table, MetaData, Column, String
 
6
from sqlalchemy.engine.reflection import Inspector
 
7
from alembic import migration
 
8
 
 
9
from alembic.util import CommandError
 
10
 
 
11
version_table = Table('version_table', MetaData(),
 
12
                      Column('version_num', String(32), nullable=False))
 
13
 
 
14
 
 
15
def _up(from_, to_, branch_presence_changed=False):
 
16
    return migration.StampStep(
 
17
        from_, to_, True, branch_presence_changed
 
18
    )
 
19
 
 
20
 
 
21
def _down(from_, to_, branch_presence_changed=False):
 
22
    return migration.StampStep(
 
23
        from_, to_, False, branch_presence_changed
 
24
    )
 
25
 
 
26
 
 
27
class TestMigrationContext(TestBase):
 
28
 
 
29
    @classmethod
 
30
    def setup_class(cls):
 
31
        cls.bind = config.db
 
32
 
 
33
    def setUp(self):
 
34
        self.connection = self.bind.connect()
 
35
        self.transaction = self.connection.begin()
 
36
 
 
37
    def tearDown(self):
 
38
        version_table.drop(self.connection, checkfirst=True)
 
39
        self.transaction.rollback()
 
40
        self.connection.close()
 
41
 
 
42
    def make_one(self, **kwargs):
 
43
        return migration.MigrationContext.configure(**kwargs)
 
44
 
 
45
    def get_revision(self):
 
46
        result = self.connection.execute(version_table.select())
 
47
        rows = result.fetchall()
 
48
        if len(rows) == 0:
 
49
            return None
 
50
        eq_(len(rows), 1)
 
51
        return rows[0]['version_num']
 
52
 
 
53
    def test_config_default_version_table_name(self):
 
54
        context = self.make_one(dialect_name='sqlite')
 
55
        eq_(context._version.name, 'alembic_version')
 
56
 
 
57
    def test_config_explicit_version_table_name(self):
 
58
        context = self.make_one(dialect_name='sqlite',
 
59
                                opts={'version_table': 'explicit'})
 
60
        eq_(context._version.name, 'explicit')
 
61
 
 
62
    def test_config_explicit_version_table_schema(self):
 
63
        context = self.make_one(dialect_name='sqlite',
 
64
                                opts={'version_table_schema': 'explicit'})
 
65
        eq_(context._version.schema, 'explicit')
 
66
 
 
67
    def test_get_current_revision_doesnt_create_version_table(self):
 
68
        context = self.make_one(connection=self.connection,
 
69
                                opts={'version_table': 'version_table'})
 
70
        eq_(context.get_current_revision(), None)
 
71
        insp = Inspector(self.connection)
 
72
        assert ('version_table' not in insp.get_table_names())
 
73
 
 
74
    def test_get_current_revision(self):
 
75
        context = self.make_one(connection=self.connection,
 
76
                                opts={'version_table': 'version_table'})
 
77
        version_table.create(self.connection)
 
78
        eq_(context.get_current_revision(), None)
 
79
        self.connection.execute(
 
80
            version_table.insert().values(version_num='revid'))
 
81
        eq_(context.get_current_revision(), 'revid')
 
82
 
 
83
    def test_get_current_revision_error_if_starting_rev_given_online(self):
 
84
        context = self.make_one(connection=self.connection,
 
85
                                opts={'starting_rev': 'boo'})
 
86
        assert_raises(
 
87
            CommandError,
 
88
            context.get_current_revision
 
89
        )
 
90
 
 
91
    def test_get_current_revision_offline(self):
 
92
        context = self.make_one(dialect_name='sqlite',
 
93
                                opts={'starting_rev': 'startrev',
 
94
                                      'as_sql': True})
 
95
        eq_(context.get_current_revision(), 'startrev')
 
96
 
 
97
    def test_get_current_revision_multiple_heads(self):
 
98
        version_table.create(self.connection)
 
99
        context = self.make_one(connection=self.connection,
 
100
                                opts={'version_table': 'version_table'})
 
101
        updater = migration.HeadMaintainer(context, ())
 
102
        updater.update_to_step(_up(None, 'a', True))
 
103
        updater.update_to_step(_up(None, 'b', True))
 
104
        assert_raises_message(
 
105
            CommandError,
 
106
            "Version table 'version_table' has more than one head present; "
 
107
            "please use get_current_heads()",
 
108
            context.get_current_revision
 
109
        )
 
110
 
 
111
    def test_get_heads(self):
 
112
        version_table.create(self.connection)
 
113
        context = self.make_one(connection=self.connection,
 
114
                                opts={'version_table': 'version_table'})
 
115
        updater = migration.HeadMaintainer(context, ())
 
116
        updater.update_to_step(_up(None, 'a', True))
 
117
        updater.update_to_step(_up(None, 'b', True))
 
118
        eq_(context.get_current_heads(), ('a', 'b'))
 
119
 
 
120
    def test_get_heads_offline(self):
 
121
        version_table.create(self.connection)
 
122
        context = self.make_one(connection=self.connection,
 
123
                                opts={
 
124
                                    'starting_rev': 'q',
 
125
                                    'version_table': 'version_table',
 
126
                                    'as_sql': True})
 
127
        eq_(context.get_current_heads(), ('q', ))
 
128
 
 
129
    def test_stamp_api_creates_table(self):
 
130
        context = self.make_one(connection=self.connection)
 
131
        assert (
 
132
            'alembic_version'
 
133
            not in Inspector(self.connection).get_table_names())
 
134
 
 
135
        script = mock.Mock(_stamp_revs=lambda revision, heads: [
 
136
            _up(None, 'a', True),
 
137
            _up(None, 'b', True)
 
138
        ])
 
139
 
 
140
        context.stamp(script, 'b')
 
141
        eq_(context.get_current_heads(), ('a', 'b'))
 
142
        assert (
 
143
            'alembic_version'
 
144
            in Inspector(self.connection).get_table_names())
 
145
 
 
146
 
 
147
class UpdateRevTest(TestBase):
 
148
    __backend__ = True
 
149
 
 
150
    @classmethod
 
151
    def setup_class(cls):
 
152
        cls.bind = config.db
 
153
 
 
154
    def setUp(self):
 
155
        self.connection = self.bind.connect()
 
156
        self.context = migration.MigrationContext.configure(
 
157
            connection=self.connection,
 
158
            opts={"version_table": "version_table"})
 
159
        version_table.create(self.connection)
 
160
        self.updater = migration.HeadMaintainer(self.context, ())
 
161
 
 
162
    def tearDown(self):
 
163
        version_table.drop(self.connection, checkfirst=True)
 
164
        self.connection.close()
 
165
 
 
166
    def _assert_heads(self, heads):
 
167
        eq_(set(self.context.get_current_heads()), set(heads))
 
168
        eq_(self.updater.heads, set(heads))
 
169
 
 
170
    def test_update_none_to_single(self):
 
171
        self.updater.update_to_step(_up(None, 'a', True))
 
172
        self._assert_heads(('a',))
 
173
 
 
174
    def test_update_single_to_single(self):
 
175
        self.updater.update_to_step(_up(None, 'a', True))
 
176
        self.updater.update_to_step(_up('a', 'b'))
 
177
        self._assert_heads(('b',))
 
178
 
 
179
    def test_update_single_to_none(self):
 
180
        self.updater.update_to_step(_up(None, 'a', True))
 
181
        self.updater.update_to_step(_down('a', None, True))
 
182
        self._assert_heads(())
 
183
 
 
184
    def test_add_branches(self):
 
185
        self.updater.update_to_step(_up(None, 'a', True))
 
186
        self.updater.update_to_step(_up('a', 'b'))
 
187
        self.updater.update_to_step(_up(None, 'c', True))
 
188
        self._assert_heads(('b', 'c'))
 
189
        self.updater.update_to_step(_up('c', 'd'))
 
190
        self.updater.update_to_step(_up('d', 'e1'))
 
191
        self.updater.update_to_step(_up('d', 'e2', True))
 
192
        self._assert_heads(('b', 'e1', 'e2'))
 
193
 
 
194
    def test_teardown_branches(self):
 
195
        self.updater.update_to_step(_up(None, 'd1', True))
 
196
        self.updater.update_to_step(_up(None, 'd2', True))
 
197
        self._assert_heads(('d1', 'd2'))
 
198
 
 
199
        self.updater.update_to_step(_down('d1', 'c'))
 
200
        self._assert_heads(('c', 'd2'))
 
201
 
 
202
        self.updater.update_to_step(_down('d2', 'c', True))
 
203
 
 
204
        self._assert_heads(('c',))
 
205
        self.updater.update_to_step(_down('c', 'b'))
 
206
        self._assert_heads(('b',))
 
207
 
 
208
    def test_resolve_merges(self):
 
209
        self.updater.update_to_step(_up(None, 'a', True))
 
210
        self.updater.update_to_step(_up('a', 'b'))
 
211
        self.updater.update_to_step(_up('b', 'c1'))
 
212
        self.updater.update_to_step(_up('b', 'c2', True))
 
213
        self.updater.update_to_step(_up('c1', 'd1'))
 
214
        self.updater.update_to_step(_up('c2', 'd2'))
 
215
        self._assert_heads(('d1', 'd2'))
 
216
        self.updater.update_to_step(_up(('d1', 'd2'), 'e'))
 
217
        self._assert_heads(('e',))
 
218
 
 
219
    def test_unresolve_merges(self):
 
220
        self.updater.update_to_step(_up(None, 'e', True))
 
221
 
 
222
        self.updater.update_to_step(_down('e', ('d1', 'd2')))
 
223
        self._assert_heads(('d2', 'd1'))
 
224
 
 
225
        self.updater.update_to_step(_down('d2', 'c2'))
 
226
        self._assert_heads(('c2', 'd1'))
 
227
 
 
228
    def test_update_no_match(self):
 
229
        self.updater.update_to_step(_up(None, 'a', True))
 
230
        self.updater.heads.add('x')
 
231
        assert_raises_message(
 
232
            CommandError,
 
233
            "Online migration expected to match one row when updating "
 
234
            "'x' to 'b' in 'version_table'; 0 found",
 
235
            self.updater.update_to_step, _up('x', 'b')
 
236
        )
 
237
 
 
238
    def test_update_multi_match(self):
 
239
        self.connection.execute(version_table.insert(), version_num='a')
 
240
        self.connection.execute(version_table.insert(), version_num='a')
 
241
 
 
242
        self.updater.heads.add('a')
 
243
        assert_raises_message(
 
244
            CommandError,
 
245
            "Online migration expected to match one row when updating "
 
246
            "'a' to 'b' in 'version_table'; 2 found",
 
247
            self.updater.update_to_step, _up('a', 'b')
 
248
        )
 
249
 
 
250
    def test_delete_no_match(self):
 
251
        self.updater.update_to_step(_up(None, 'a', True))
 
252
 
 
253
        self.updater.heads.add('x')
 
254
        assert_raises_message(
 
255
            CommandError,
 
256
            "Online migration expected to match one row when "
 
257
            "deleting 'x' in 'version_table'; 0 found",
 
258
            self.updater.update_to_step, _down('x', None, True)
 
259
        )
 
260
 
 
261
    def test_delete_multi_match(self):
 
262
        self.connection.execute(version_table.insert(), version_num='a')
 
263
        self.connection.execute(version_table.insert(), version_num='a')
 
264
 
 
265
        self.updater.heads.add('a')
 
266
        assert_raises_message(
 
267
            CommandError,
 
268
            "Online migration expected to match one row when "
 
269
            "deleting 'a' in 'version_table'; 2 found",
 
270
            self.updater.update_to_step, _down('a', None, True)
 
271
        )
 
272