1
# Copyright 2010 Canonical Ltd.
3
# This file is part of desktopcouch.
5
# desktopcouch is free software: you can redistribute it and/or modify
6
# it under the terms of the GNU Lesser General Public License version 3
7
# as published by the Free Software Foundation.
9
# desktopcouch is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
# GNU Lesser General Public License for more details.
14
# You should have received a copy of the GNU Lesser General Public License
15
# along with desktopcouch. If not, see <http://www.gnu.org/licenses/>.
17
# Authors: Eric Casteleijn <eric.casteleijn@canonical.com>
18
# Vincenzo Di Somma <vincenzo.di.somma@canonical.com>
20
"""Tests for migration infrastructure"""
22
#pylint: disable=W0212
23
#tests can access private attributes
25
from couchdb.http import ResourceNotFound
26
from uuid import uuid4
28
import desktopcouch.application.tests as test_environment
30
from desktopcouch.application import migration
31
from desktopcouch.application.tests import TestCase
32
from desktopcouch.application.server import DesktopDatabase
33
from desktopcouch.records import Record, NoRecordTypeSpecified
34
from desktopcouch.records.database import DCTRASH, base_n
35
from desktopcouch.application.stop_local_couchdb import stop_couchdb
38
def get_test_context():
39
"""Return test context."""
40
return test_environment.test_context
44
"""fake migration code"""
48
class MigrationBase(TestCase):
49
"""Tests the registration of a migration script"""
53
self.old_registry = migration.MIGRATIONS_REGISTRY[:]
54
super(MigrationBase, self).setUp()
55
self.ctx = get_test_context()
56
self.dbname = self._testMethodName
57
self.database = DesktopDatabase(self.dbname, create=True, ctx=self.ctx)
58
self.server = self.database._server
61
"""tear down each test"""
62
del self.database._server[self.database._database_name]
64
del self.database._server[DCTRASH]
65
except ResourceNotFound:
67
this_context = get_test_context()
68
if this_context != test_environment.test_context:
69
stop_couchdb(ctx=this_context)
70
super(MigrationBase, self).tearDown()
71
migration.MIGRATIONS_REGISTRY = self.old_registry[:]
74
class TestRegistration(MigrationBase):
75
"""Tests the registration of a migration script"""
79
super(TestRegistration, self).setUp()
80
self.one_more_database = DesktopDatabase(
81
'one_more_database', create=True, ctx=self.ctx)
84
"""tear down each test"""
85
del self.database._server[self.one_more_database._database_name]
87
del self.database._server[DCTRASH]
88
except ResourceNotFound:
90
super(TestRegistration, self).tearDown()
92
def test_register_migration_is_added_to_the_registry(self):
93
"""Test that the migration script is correctly registered."""
94
migration.MIGRATIONS_REGISTRY = []
95
migration.register(migration_method=fake_migration, dbs=[self.dbname])
96
self.assertEqual([{'method': fake_migration, 'dbs': [self.dbname]}],
97
migration.MIGRATIONS_REGISTRY)
98
self.assertEqual(1, len(migration.MIGRATIONS_REGISTRY))
101
class TestMigration(MigrationBase):
102
"""Tests the actual migration script"""
105
super(TestMigration, self).setUp()
107
del self.database._server[DCTRASH]
108
except ResourceNotFound:
110
self.trash = DesktopDatabase(DCTRASH, create=True, ctx=self.ctx)
112
for document_id in self.trash.db:
113
# remove documents added from other tests.
114
del self.trash.db[document_id]
117
"""tear down each test"""
118
super(TestMigration, self).tearDown()
120
del self.database._server[DCTRASH]
121
except ResourceNotFound:
124
def test_migration_script_is_run(self):
125
"""Test that the migration script is run."""
127
def simple_migration(database):
128
"""simple migration code"""
129
# just asserting something to check this code is run
130
# and get the right db as argument
131
self.assertEqual(self.dbname, database.db.name)
133
migration_method=simple_migration, dbs=[self.dbname])
134
migration.run_needed_migrations(ctx=self.ctx)
136
def test_migration_script_is_run_and_can_access_view(self):
137
"""Test that the migration script is run."""
139
simple_view_code = 'function(doc){emit(doc._id, doc.record_type)}'
141
def simple_migration(database):
142
"""simple migration code"""
144
'simple_view', map_js=simple_view_code,
145
design_doc=migration.MIGRATION_DESIGN_DOCUMENT)
146
results = database.execute_view(
148
design_doc='dc_migration')
149
self.assertEqual(3, len(results))
151
self.database.put_record(Record({
152
'key1_1': 'val1_1', 'record_type': 'test.com'}))
153
self.database.put_record(Record({
154
'key2_1': 'val2_1', 'record_type': 'test.com'}))
155
self.database.put_record(Record({
156
'key13_1': 'va31_1', 'record_type': 'test.com'}))
159
migration_method=simple_migration, dbs=[self.dbname])
160
migration.run_needed_migrations(ctx=self.ctx)
162
def test_migration_deleted_flag_to_trash(self):
163
"""Test that the migration script is run."""
164
record_id = self.database.put_record(
167
'record_type': 'test.com',
168
'application_annotations': {
170
'private_application_annotations': {
171
'deleted': True}}}}))
172
undeleted_record_id1 = self.database.put_record(
175
'record_type': 'test.com',
176
'application_annotations': {
178
'private_application_annotations': {
179
'deleted': False}}}}))
180
undeleted_record_id2 = self.database.put_record(
183
'record_type': 'test.com'}))
185
migration.run_needed_migrations(ctx=self.ctx)
186
# Record no longer exists in database
187
self.assertIs(None, self.database.get_record(record_id))
188
# Undeleted records still exist
189
self.assertIsNot(None, self.database.get_record(undeleted_record_id1))
190
self.assertIsNot(None, self.database.get_record(undeleted_record_id2))
192
# Known deleted record is only record in trash
193
for document_id in self.trash.db:
195
record = self.trash.get_record(document_id)
196
except NoRecordTypeSpecified:
198
private = record.application_annotations['desktopcouch'][
199
'private_application_annotations']
200
self.assertEqual(self.dbname, private['original_database_name'])
201
self.assertEqual(record_id, private['original_id'])
203
def test_migration_in_face_of_broken_records(self):
204
"""Test that the migration does not break when we have a
205
'record' without a record_type.
208
# pylint: disable=E1101
209
record_id = base_n(uuid4().int, 62)
210
self.database.db[record_id] = {
213
"application_annotations": {
215
"private_application_annotations": {
217
undeleted_record_id1 = base_n(uuid4().int, 62)
218
self.database.db[undeleted_record_id1] = {
220
'application_annotations': {
222
'private_application_annotations': {
224
undeleted_record_id2 = base_n(uuid4().int, 62)
225
self.database.db[undeleted_record_id2] = {
227
# pylint: enable=E1101
228
migration.run_needed_migrations(ctx=self.ctx)
229
# None of them are deleted, since they are not records
230
self.assertIn(record_id, self.database.db)
231
self.assertIn(undeleted_record_id1, self.database.db)
232
self.assertIn(undeleted_record_id2, self.database.db)
233
self.assertNotIn(record_id, self.trash.db)
234
self.assertNotIn(undeleted_record_id1, self.trash.db)
235
self.assertNotIn(undeleted_record_id2, self.trash.db)