1
# Copyright 2010-2011 OpenStack Foundation
2
# Copyright 2012-2013 IBM Corp.
5
# Licensed under the Apache License, Version 2.0 (the "License"); you may
6
# not use this file except in compliance with the License. You may obtain
7
# a copy of the License at
9
# http://www.apache.org/licenses/LICENSE-2.0
11
# Unless required by applicable law or agreed to in writing, software
12
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14
# License for the specific language governing permissions and limitations
26
from ironic.openstack.common.gettextutils import _
27
from ironic.openstack.common import log as logging
28
from ironic.openstack.common.py3kcompat import urlutils
29
from ironic.openstack.common import test
31
LOG = logging.getLogger(__name__)
34
def _get_connect_string(backend, user, passwd, database):
35
"""Get database connection
37
Try to get a connection with a very specific set of values, if we get
38
these then we'll run the tests, otherwise they are skipped
40
if backend == "postgres":
41
backend = "postgresql+psycopg2"
42
elif backend == "mysql":
43
backend = "mysql+mysqldb"
45
raise Exception("Unrecognized backend: '%s'" % backend)
47
return ("%(backend)s://%(user)s:%(passwd)s@localhost/%(database)s"
48
% {'backend': backend, 'user': user, 'passwd': passwd,
49
'database': database})
52
def _is_backend_avail(backend, user, passwd, database):
54
connect_uri = _get_connect_string(backend, user, passwd, database)
55
engine = sqlalchemy.create_engine(connect_uri)
56
connection = engine.connect()
58
# intentionally catch all to handle exceptions even if we don't
59
# have any backend code loaded.
67
def _have_mysql(user, passwd, database):
68
present = os.environ.get('TEST_MYSQL_PRESENT')
70
return _is_backend_avail('mysql', user, passwd, database)
71
return present.lower() in ('', 'true')
74
def _have_postgresql(user, passwd, database):
75
present = os.environ.get('TEST_POSTGRESQL_PRESENT')
77
return _is_backend_avail('postgres', user, passwd, database)
78
return present.lower() in ('', 'true')
81
def get_db_connection_info(conn_pieces):
82
database = conn_pieces.path.strip('/')
83
loc_pieces = conn_pieces.netloc.split('@')
86
auth_pieces = loc_pieces[0].split(':')
89
if len(auth_pieces) > 1:
90
password = auth_pieces[1].strip()
92
return (user, password, database, host)
95
def _set_db_lock(lock_path=None, lock_prefix=None):
98
def wrapper(*args, **kwargs):
100
path = lock_path or os.environ.get("IRONIC_LOCK_PATH")
101
lock = lockfile.FileLock(os.path.join(path, lock_prefix))
103
LOG.debug(_('Got lock "%s"') % f.__name__)
104
return f(*args, **kwargs)
106
LOG.debug(_('Lock released "%s"') % f.__name__)
111
class BaseMigrationTestCase(test.BaseTestCase):
112
"""Base class fort testing of migration utils."""
114
def __init__(self, *args, **kwargs):
115
super(BaseMigrationTestCase, self).__init__(*args, **kwargs)
117
self.DEFAULT_CONFIG_FILE = os.path.join(os.path.dirname(__file__),
118
'test_migrations.conf')
119
# Test machines can set the TEST_MIGRATIONS_CONF variable
120
# to override the location of the config file for migration testing
121
self.CONFIG_FILE_PATH = os.environ.get('TEST_MIGRATIONS_CONF',
122
self.DEFAULT_CONFIG_FILE)
123
self.test_databases = {}
124
self.migration_api = None
127
super(BaseMigrationTestCase, self).setUp()
129
# Load test databases from the config file. Only do this
130
# once. No need to re-run this on each test...
131
LOG.debug('config_path is %s' % self.CONFIG_FILE_PATH)
132
if os.path.exists(self.CONFIG_FILE_PATH):
133
cp = moves.configparser.RawConfigParser()
135
cp.read(self.CONFIG_FILE_PATH)
136
defaults = cp.defaults()
137
for key, value in defaults.items():
138
self.test_databases[key] = value
139
except moves.configparser.ParsingError as e:
140
self.fail("Failed to read test_migrations.conf config "
141
"file. Got error: %s" % e)
143
self.fail("Failed to find test_migrations.conf config "
147
for key, value in self.test_databases.items():
148
self.engines[key] = sqlalchemy.create_engine(value)
150
# We start each test case with a completely blank slate.
151
self._reset_databases()
154
# We destroy the test data store between each test case,
155
# and recreate it, which ensures that we have no side-effects
157
self._reset_databases()
158
super(BaseMigrationTestCase, self).tearDown()
160
def execute_cmd(self, cmd=None):
161
process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
162
stderr=subprocess.STDOUT)
163
output = process.communicate()[0]
165
self.assertEqual(0, process.returncode,
166
"Failed to run: %s\n%s" % (cmd, output))
168
def _reset_pg(self, conn_pieces):
169
(user, password, database, host) = get_db_connection_info(conn_pieces)
170
os.environ['PGPASSWORD'] = password
171
os.environ['PGUSER'] = user
172
# note(boris-42): We must create and drop database, we can't
173
# drop database which we have connected to, so for such
174
# operations there is a special database template1.
175
sqlcmd = ("psql -w -U %(user)s -h %(host)s -c"
176
" '%(sql)s' -d template1")
178
sql = ("drop database if exists %s;") % database
179
droptable = sqlcmd % {'user': user, 'host': host, 'sql': sql}
180
self.execute_cmd(droptable)
182
sql = ("create database %s;") % database
183
createtable = sqlcmd % {'user': user, 'host': host, 'sql': sql}
184
self.execute_cmd(createtable)
186
os.unsetenv('PGPASSWORD')
187
os.unsetenv('PGUSER')
189
@_set_db_lock(lock_prefix='migration_tests-')
190
def _reset_databases(self):
191
for key, engine in self.engines.items():
192
conn_string = self.test_databases[key]
193
conn_pieces = urlutils.urlparse(conn_string)
195
if conn_string.startswith('sqlite'):
196
# We can just delete the SQLite database, which is
197
# the easiest and cleanest solution
198
db_path = conn_pieces.path.strip('/')
199
if os.path.exists(db_path):
201
# No need to recreate the SQLite DB. SQLite will
202
# create it for us if it's not there...
203
elif conn_string.startswith('mysql'):
204
# We can execute the MySQL client to destroy and re-create
205
# the MYSQL database, which is easier and less error-prone
206
# than using SQLAlchemy to do this via MetaData...trust me.
207
(user, password, database, host) = \
208
get_db_connection_info(conn_pieces)
209
sql = ("drop database if exists %(db)s; "
210
"create database %(db)s;") % {'db': database}
211
cmd = ("mysql -u \"%(user)s\" -p\"%(password)s\" -h %(host)s "
212
"-e \"%(sql)s\"") % {'user': user, 'password': password,
213
'host': host, 'sql': sql}
214
self.execute_cmd(cmd)
215
elif conn_string.startswith('postgresql'):
216
self._reset_pg(conn_pieces)
219
class WalkVersionsMixin(object):
220
def _walk_versions(self, engine=None, snake_walk=False, downgrade=True):
221
# Determine latest version script from the repo, then
222
# upgrade from 1 through to the latest, with no data
223
# in the databases. This just checks that the schema itself
224
# upgrades successfully.
226
# Place the database under version control
227
self.migration_api.version_control(engine, self.REPOSITORY,
229
self.assertEqual(self.INIT_VERSION,
230
self.migration_api.db_version(engine,
233
LOG.debug('latest version is %s' % self.REPOSITORY.latest)
234
versions = range(self.INIT_VERSION + 1, self.REPOSITORY.latest + 1)
236
for version in versions:
237
# upgrade -> downgrade -> upgrade
238
self._migrate_up(engine, version, with_data=True)
240
downgraded = self._migrate_down(
241
engine, version - 1, with_data=True)
243
self._migrate_up(engine, version)
246
# Now walk it back down to 0 from the latest, testing
247
# the downgrade paths.
248
for version in reversed(versions):
249
# downgrade -> upgrade -> downgrade
250
downgraded = self._migrate_down(engine, version - 1)
252
if snake_walk and downgraded:
253
self._migrate_up(engine, version)
254
self._migrate_down(engine, version - 1)
256
def _migrate_down(self, engine, version, with_data=False):
258
self.migration_api.downgrade(engine, self.REPOSITORY, version)
259
except NotImplementedError:
260
# NOTE(sirp): some migrations, namely release-level
261
# migrations, don't support a downgrade.
265
version, self.migration_api.db_version(engine, self.REPOSITORY))
267
# NOTE(sirp): `version` is what we're downgrading to (i.e. the 'target'
268
# version). So if we have any downgrade checks, they need to be run for
269
# the previous (higher numbered) migration.
271
post_downgrade = getattr(
272
self, "_post_downgrade_%03d" % (version + 1), None)
274
post_downgrade(engine)
278
def _migrate_up(self, engine, version, with_data=False):
279
"""migrate up to a new version of the db.
281
We allow for data insertion and post checks at every
282
migration version with special _pre_upgrade_### and
283
_check_### functions in the main test.
285
# NOTE(sdague): try block is here because it's impossible to debug
286
# where a failed data migration happens otherwise
290
pre_upgrade = getattr(
291
self, "_pre_upgrade_%03d" % version, None)
293
data = pre_upgrade(engine)
295
self.migration_api.upgrade(engine, self.REPOSITORY, version)
296
self.assertEqual(version,
297
self.migration_api.db_version(engine,
300
check = getattr(self, "_check_%03d" % version, None)
304
LOG.error("Failed to migrate to version %s on engine %s" %