~ubuntu-branches/ubuntu/utopic/ceilometer/utopic-proposed

« back to all changes in this revision

Viewing changes to ceilometer/openstack/common/db/sqlalchemy/session.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short
  • Date: 2014-10-16 14:07:11 UTC
  • mfrom: (1.2.1) (28.1.5 utopic-proposed)
  • Revision ID: package-import@ubuntu.com-20141016140711-95mki6bdkivvfr2x
Tags: 2014.2-0ubuntu1
New upstream release. 

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright 2010 United States Government as represented by the
2
 
# Administrator of the National Aeronautics and Space Administration.
3
 
# All Rights Reserved.
4
 
#
5
 
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
6
 
#    not use this file except in compliance with the License. You may obtain
7
 
#    a copy of the License at
8
 
#
9
 
#         http://www.apache.org/licenses/LICENSE-2.0
10
 
#
11
 
#    Unless required by applicable law or agreed to in writing, software
12
 
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13
 
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14
 
#    License for the specific language governing permissions and limitations
15
 
#    under the License.
16
 
 
17
 
"""Session Handling for SQLAlchemy backend.
18
 
 
19
 
Recommended ways to use sessions within this framework:
20
 
 
21
 
* Don't use them explicitly; this is like running with ``AUTOCOMMIT=1``.
22
 
  `model_query()` will implicitly use a session when called without one
23
 
  supplied. This is the ideal situation because it will allow queries
24
 
  to be automatically retried if the database connection is interrupted.
25
 
 
26
 
  .. note:: Automatic retry will be enabled in a future patch.
27
 
 
28
 
  It is generally fine to issue several queries in a row like this. Even though
29
 
  they may be run in separate transactions and/or separate sessions, each one
30
 
  will see the data from the prior calls. If needed, undo- or rollback-like
31
 
  functionality should be handled at a logical level. For an example, look at
32
 
  the code around quotas and `reservation_rollback()`.
33
 
 
34
 
  Examples:
35
 
 
36
 
  .. code:: python
37
 
 
38
 
    def get_foo(context, foo):
39
 
        return (model_query(context, models.Foo).
40
 
                filter_by(foo=foo).
41
 
                first())
42
 
 
43
 
    def update_foo(context, id, newfoo):
44
 
        (model_query(context, models.Foo).
45
 
                filter_by(id=id).
46
 
                update({'foo': newfoo}))
47
 
 
48
 
    def create_foo(context, values):
49
 
        foo_ref = models.Foo()
50
 
        foo_ref.update(values)
51
 
        foo_ref.save()
52
 
        return foo_ref
53
 
 
54
 
 
55
 
* Within the scope of a single method, keep all the reads and writes within
56
 
  the context managed by a single session. In this way, the session's
57
 
  `__exit__` handler will take care of calling `flush()` and `commit()` for
58
 
  you. If using this approach, you should not explicitly call `flush()` or
59
 
  `commit()`. Any error within the context of the session will cause the
60
 
  session to emit a `ROLLBACK`. Database errors like `IntegrityError` will be
61
 
  raised in `session`'s `__exit__` handler, and any try/except within the
62
 
  context managed by `session` will not be triggered. And catching other
63
 
  non-database errors in the session will not trigger the ROLLBACK, so
64
 
  exception handlers should  always be outside the session, unless the
65
 
  developer wants to do a partial commit on purpose. If the connection is
66
 
  dropped before this is possible, the database will implicitly roll back the
67
 
  transaction.
68
 
 
69
 
  .. note:: Statements in the session scope will not be automatically retried.
70
 
 
71
 
  If you create models within the session, they need to be added, but you
72
 
  do not need to call `model.save()`:
73
 
 
74
 
  .. code:: python
75
 
 
76
 
    def create_many_foo(context, foos):
77
 
        session = sessionmaker()
78
 
        with session.begin():
79
 
            for foo in foos:
80
 
                foo_ref = models.Foo()
81
 
                foo_ref.update(foo)
82
 
                session.add(foo_ref)
83
 
 
84
 
    def update_bar(context, foo_id, newbar):
85
 
        session = sessionmaker()
86
 
        with session.begin():
87
 
            foo_ref = (model_query(context, models.Foo, session).
88
 
                        filter_by(id=foo_id).
89
 
                        first())
90
 
            (model_query(context, models.Bar, session).
91
 
                        filter_by(id=foo_ref['bar_id']).
92
 
                        update({'bar': newbar}))
93
 
 
94
 
  .. note:: `update_bar` is a trivially simple example of using
95
 
     ``with session.begin``. Whereas `create_many_foo` is a good example of
96
 
     when a transaction is needed, it is always best to use as few queries as
97
 
     possible.
98
 
 
99
 
  The two queries in `update_bar` can be better expressed using a single query
100
 
  which avoids the need for an explicit transaction. It can be expressed like
101
 
  so:
102
 
 
103
 
  .. code:: python
104
 
 
105
 
    def update_bar(context, foo_id, newbar):
106
 
        subq = (model_query(context, models.Foo.id).
107
 
                filter_by(id=foo_id).
108
 
                limit(1).
109
 
                subquery())
110
 
        (model_query(context, models.Bar).
111
 
                filter_by(id=subq.as_scalar()).
112
 
                update({'bar': newbar}))
113
 
 
114
 
  For reference, this emits approximately the following SQL statement:
115
 
 
116
 
  .. code:: sql
117
 
 
118
 
    UPDATE bar SET bar = ${newbar}
119
 
        WHERE id=(SELECT bar_id FROM foo WHERE id = ${foo_id} LIMIT 1);
120
 
 
121
 
  .. note:: `create_duplicate_foo` is a trivially simple example of catching an
122
 
     exception while using ``with session.begin``. Here create two duplicate
123
 
     instances with same primary key, must catch the exception out of context
124
 
     managed by a single session:
125
 
 
126
 
  .. code:: python
127
 
 
128
 
    def create_duplicate_foo(context):
129
 
        foo1 = models.Foo()
130
 
        foo2 = models.Foo()
131
 
        foo1.id = foo2.id = 1
132
 
        session = sessionmaker()
133
 
        try:
134
 
            with session.begin():
135
 
                session.add(foo1)
136
 
                session.add(foo2)
137
 
        except exception.DBDuplicateEntry as e:
138
 
            handle_error(e)
139
 
 
140
 
* Passing an active session between methods. Sessions should only be passed
141
 
  to private methods. The private method must use a subtransaction; otherwise
142
 
  SQLAlchemy will throw an error when you call `session.begin()` on an existing
143
 
  transaction. Public methods should not accept a session parameter and should
144
 
  not be involved in sessions within the caller's scope.
145
 
 
146
 
  Note that this incurs more overhead in SQLAlchemy than the above means
147
 
  due to nesting transactions, and it is not possible to implicitly retry
148
 
  failed database operations when using this approach.
149
 
 
150
 
  This also makes code somewhat more difficult to read and debug, because a
151
 
  single database transaction spans more than one method. Error handling
152
 
  becomes less clear in this situation. When this is needed for code clarity,
153
 
  it should be clearly documented.
154
 
 
155
 
  .. code:: python
156
 
 
157
 
    def myfunc(foo):
158
 
        session = sessionmaker()
159
 
        with session.begin():
160
 
            # do some database things
161
 
            bar = _private_func(foo, session)
162
 
        return bar
163
 
 
164
 
    def _private_func(foo, session=None):
165
 
        if not session:
166
 
            session = sessionmaker()
167
 
        with session.begin(subtransaction=True):
168
 
            # do some other database things
169
 
        return bar
170
 
 
171
 
 
172
 
There are some things which it is best to avoid:
173
 
 
174
 
* Don't keep a transaction open any longer than necessary.
175
 
 
176
 
  This means that your ``with session.begin()`` block should be as short
177
 
  as possible, while still containing all the related calls for that
178
 
  transaction.
179
 
 
180
 
* Avoid ``with_lockmode('UPDATE')`` when possible.
181
 
 
182
 
  In MySQL/InnoDB, when a ``SELECT ... FOR UPDATE`` query does not match
183
 
  any rows, it will take a gap-lock. This is a form of write-lock on the
184
 
  "gap" where no rows exist, and prevents any other writes to that space.
185
 
  This can effectively prevent any INSERT into a table by locking the gap
186
 
  at the end of the index. Similar problems will occur if the SELECT FOR UPDATE
187
 
  has an overly broad WHERE clause, or doesn't properly use an index.
188
 
 
189
 
  One idea proposed at ODS Fall '12 was to use a normal SELECT to test the
190
 
  number of rows matching a query, and if only one row is returned,
191
 
  then issue the SELECT FOR UPDATE.
192
 
 
193
 
  The better long-term solution is to use
194
 
  ``INSERT .. ON DUPLICATE KEY UPDATE``.
195
 
  However, this can not be done until the "deleted" columns are removed and
196
 
  proper UNIQUE constraints are added to the tables.
197
 
 
198
 
 
199
 
Enabling soft deletes:
200
 
 
201
 
* To use/enable soft-deletes, the `SoftDeleteMixin` must be added
202
 
  to your model class. For example:
203
 
 
204
 
  .. code:: python
205
 
 
206
 
      class NovaBase(models.SoftDeleteMixin, models.ModelBase):
207
 
          pass
208
 
 
209
 
 
210
 
Efficient use of soft deletes:
211
 
 
212
 
* There are two possible ways to mark a record as deleted:
213
 
  `model.soft_delete()` and `query.soft_delete()`.
214
 
 
215
 
  The `model.soft_delete()` method works with a single already-fetched entry.
216
 
  `query.soft_delete()` makes only one db request for all entries that
217
 
  correspond to the query.
218
 
 
219
 
* In almost all cases you should use `query.soft_delete()`. Some examples:
220
 
 
221
 
  .. code:: python
222
 
 
223
 
        def soft_delete_bar():
224
 
            count = model_query(BarModel).find(some_condition).soft_delete()
225
 
            if count == 0:
226
 
                raise Exception("0 entries were soft deleted")
227
 
 
228
 
        def complex_soft_delete_with_synchronization_bar(session=None):
229
 
            if session is None:
230
 
                session = sessionmaker()
231
 
            with session.begin(subtransactions=True):
232
 
                count = (model_query(BarModel).
233
 
                            find(some_condition).
234
 
                            soft_delete(synchronize_session=True))
235
 
                            # Here synchronize_session is required, because we
236
 
                            # don't know what is going on in outer session.
237
 
                if count == 0:
238
 
                    raise Exception("0 entries were soft deleted")
239
 
 
240
 
* There is only one situation where `model.soft_delete()` is appropriate: when
241
 
  you fetch a single record, work with it, and mark it as deleted in the same
242
 
  transaction.
243
 
 
244
 
  .. code:: python
245
 
 
246
 
        def soft_delete_bar_model():
247
 
            session = sessionmaker()
248
 
            with session.begin():
249
 
                bar_ref = model_query(BarModel).find(some_condition).first()
250
 
                # Work with bar_ref
251
 
                bar_ref.soft_delete(session=session)
252
 
 
253
 
  However, if you need to work with all entries that correspond to query and
254
 
  then soft delete them you should use the `query.soft_delete()` method:
255
 
 
256
 
  .. code:: python
257
 
 
258
 
        def soft_delete_multi_models():
259
 
            session = sessionmaker()
260
 
            with session.begin():
261
 
                query = (model_query(BarModel, session=session).
262
 
                            find(some_condition))
263
 
                model_refs = query.all()
264
 
                # Work with model_refs
265
 
                query.soft_delete(synchronize_session=False)
266
 
                # synchronize_session=False should be set if there is no outer
267
 
                # session and these entries are not used after this.
268
 
 
269
 
  When working with many rows, it is very important to use query.soft_delete,
270
 
  which issues a single query. Using `model.soft_delete()`, as in the following
271
 
  example, is very inefficient.
272
 
 
273
 
  .. code:: python
274
 
 
275
 
        for bar_ref in bar_refs:
276
 
            bar_ref.soft_delete(session=session)
277
 
        # This will produce count(bar_refs) db requests.
278
 
 
279
 
"""
280
 
 
281
 
import functools
282
 
import logging
283
 
import re
284
 
import time
285
 
 
286
 
import six
287
 
from sqlalchemy import exc as sqla_exc
288
 
from sqlalchemy.interfaces import PoolListener
289
 
import sqlalchemy.orm
290
 
from sqlalchemy.pool import NullPool, StaticPool
291
 
from sqlalchemy.sql.expression import literal_column
292
 
 
293
 
from ceilometer.openstack.common.db import exception
294
 
from ceilometer.openstack.common.gettextutils import _LE, _LW
295
 
from ceilometer.openstack.common import timeutils
296
 
 
297
 
 
298
 
LOG = logging.getLogger(__name__)
299
 
 
300
 
 
301
 
class SqliteForeignKeysListener(PoolListener):
302
 
    """Ensures that the foreign key constraints are enforced in SQLite.
303
 
 
304
 
    The foreign key constraints are disabled by default in SQLite,
305
 
    so the foreign key constraints will be enabled here for every
306
 
    database connection
307
 
    """
308
 
    def connect(self, dbapi_con, con_record):
309
 
        dbapi_con.execute('pragma foreign_keys=ON')
310
 
 
311
 
 
312
 
# note(boris-42): In current versions of DB backends unique constraint
313
 
# violation messages follow the structure:
314
 
#
315
 
# sqlite:
316
 
# 1 column - (IntegrityError) column c1 is not unique
317
 
# N columns - (IntegrityError) column c1, c2, ..., N are not unique
318
 
#
319
 
# sqlite since 3.7.16:
320
 
# 1 column - (IntegrityError) UNIQUE constraint failed: tbl.k1
321
 
#
322
 
# N columns - (IntegrityError) UNIQUE constraint failed: tbl.k1, tbl.k2
323
 
#
324
 
# postgres:
325
 
# 1 column - (IntegrityError) duplicate key value violates unique
326
 
#               constraint "users_c1_key"
327
 
# N columns - (IntegrityError) duplicate key value violates unique
328
 
#               constraint "name_of_our_constraint"
329
 
#
330
 
# mysql:
331
 
# 1 column - (IntegrityError) (1062, "Duplicate entry 'value_of_c1' for key
332
 
#               'c1'")
333
 
# N columns - (IntegrityError) (1062, "Duplicate entry 'values joined
334
 
#               with -' for key 'name_of_our_constraint'")
335
 
#
336
 
# ibm_db_sa:
337
 
# N columns - (IntegrityError) SQL0803N  One or more values in the INSERT
338
 
#                statement, UPDATE statement, or foreign key update caused by a
339
 
#                DELETE statement are not valid because the primary key, unique
340
 
#                constraint or unique index identified by "2" constrains table
341
 
#                "NOVA.KEY_PAIRS" from having duplicate values for the index
342
 
#                key.
343
 
_DUP_KEY_RE_DB = {
344
 
    "sqlite": (re.compile(r"^.*columns?([^)]+)(is|are)\s+not\s+unique$"),
345
 
               re.compile(r"^.*UNIQUE\s+constraint\s+failed:\s+(.+)$")),
346
 
    "postgresql": (re.compile(r"^.*duplicate\s+key.*\"([^\"]+)\"\s*\n.*$"),),
347
 
    "mysql": (re.compile(r"^.*\(1062,.*'([^\']+)'\"\)$"),),
348
 
    "ibm_db_sa": (re.compile(r"^.*SQL0803N.*$"),),
349
 
}
350
 
 
351
 
 
352
 
def _raise_if_duplicate_entry_error(integrity_error, engine_name):
353
 
    """Raise exception if two entries are duplicated.
354
 
 
355
 
    In this function will be raised DBDuplicateEntry exception if integrity
356
 
    error wrap unique constraint violation.
357
 
    """
358
 
 
359
 
    def get_columns_from_uniq_cons_or_name(columns):
360
 
        # note(vsergeyev): UniqueConstraint name convention: "uniq_t0c10c2"
361
 
        #                  where `t` it is table name and columns `c1`, `c2`
362
 
        #                  are in UniqueConstraint.
363
 
        uniqbase = "uniq_"
364
 
        if not columns.startswith(uniqbase):
365
 
            if engine_name == "postgresql":
366
 
                return [columns[columns.index("_") + 1:columns.rindex("_")]]
367
 
            return [columns]
368
 
        return columns[len(uniqbase):].split("0")[1:]
369
 
 
370
 
    if engine_name not in ["ibm_db_sa", "mysql", "sqlite", "postgresql"]:
371
 
        return
372
 
 
373
 
    # FIXME(johannes): The usage of the .message attribute has been
374
 
    # deprecated since Python 2.6. However, the exceptions raised by
375
 
    # SQLAlchemy can differ when using unicode() and accessing .message.
376
 
    # An audit across all three supported engines will be necessary to
377
 
    # ensure there are no regressions.
378
 
    for pattern in _DUP_KEY_RE_DB[engine_name]:
379
 
        match = pattern.match(integrity_error.message)
380
 
        if match:
381
 
            break
382
 
    else:
383
 
        return
384
 
 
385
 
    # NOTE(mriedem): The ibm_db_sa integrity error message doesn't provide the
386
 
    # columns so we have to omit that from the DBDuplicateEntry error.
387
 
    columns = ''
388
 
 
389
 
    if engine_name != 'ibm_db_sa':
390
 
        columns = match.group(1)
391
 
 
392
 
    if engine_name == "sqlite":
393
 
        columns = [c.split('.')[-1] for c in columns.strip().split(", ")]
394
 
    else:
395
 
        columns = get_columns_from_uniq_cons_or_name(columns)
396
 
    raise exception.DBDuplicateEntry(columns, integrity_error)
397
 
 
398
 
 
399
 
# NOTE(comstud): In current versions of DB backends, Deadlock violation
400
 
# messages follow the structure:
401
 
#
402
 
# mysql:
403
 
# (OperationalError) (1213, 'Deadlock found when trying to get lock; try '
404
 
#                     'restarting transaction') <query_str> <query_args>
405
 
_DEADLOCK_RE_DB = {
406
 
    "mysql": re.compile(r"^.*\(1213, 'Deadlock.*")
407
 
}
408
 
 
409
 
 
410
 
def _raise_if_deadlock_error(operational_error, engine_name):
411
 
    """Raise exception on deadlock condition.
412
 
 
413
 
    Raise DBDeadlock exception if OperationalError contains a Deadlock
414
 
    condition.
415
 
    """
416
 
    re = _DEADLOCK_RE_DB.get(engine_name)
417
 
    if re is None:
418
 
        return
419
 
    # FIXME(johannes): The usage of the .message attribute has been
420
 
    # deprecated since Python 2.6. However, the exceptions raised by
421
 
    # SQLAlchemy can differ when using unicode() and accessing .message.
422
 
    # An audit across all three supported engines will be necessary to
423
 
    # ensure there are no regressions.
424
 
    m = re.match(operational_error.message)
425
 
    if not m:
426
 
        return
427
 
    raise exception.DBDeadlock(operational_error)
428
 
 
429
 
 
430
 
def _wrap_db_error(f):
431
 
    @functools.wraps(f)
432
 
    def _wrap(self, *args, **kwargs):
433
 
        try:
434
 
            assert issubclass(
435
 
                self.__class__, sqlalchemy.orm.session.Session
436
 
            ), ('_wrap_db_error() can only be applied to methods of '
437
 
                'subclasses of sqlalchemy.orm.session.Session.')
438
 
 
439
 
            return f(self, *args, **kwargs)
440
 
        except UnicodeEncodeError:
441
 
            raise exception.DBInvalidUnicodeParameter()
442
 
        except sqla_exc.OperationalError as e:
443
 
            _raise_if_db_connection_lost(e, self.bind)
444
 
            _raise_if_deadlock_error(e, self.bind.dialect.name)
445
 
            # NOTE(comstud): A lot of code is checking for OperationalError
446
 
            # so let's not wrap it for now.
447
 
            raise
448
 
        # note(boris-42): We should catch unique constraint violation and
449
 
        # wrap it by our own DBDuplicateEntry exception. Unique constraint
450
 
        # violation is wrapped by IntegrityError.
451
 
        except sqla_exc.IntegrityError as e:
452
 
            # note(boris-42): SqlAlchemy doesn't unify errors from different
453
 
            # DBs so we must do this. Also in some tables (for example
454
 
            # instance_types) there are more than one unique constraint. This
455
 
            # means we should get names of columns, which values violate
456
 
            # unique constraint, from error message.
457
 
            _raise_if_duplicate_entry_error(e, self.bind.dialect.name)
458
 
            raise exception.DBError(e)
459
 
        except Exception as e:
460
 
            LOG.exception(_LE('DB exception wrapped.'))
461
 
            raise exception.DBError(e)
462
 
    return _wrap
463
 
 
464
 
 
465
 
def _synchronous_switch_listener(dbapi_conn, connection_rec):
466
 
    """Switch sqlite connections to non-synchronous mode."""
467
 
    dbapi_conn.execute("PRAGMA synchronous = OFF")
468
 
 
469
 
 
470
 
def _add_regexp_listener(dbapi_con, con_record):
471
 
    """Add REGEXP function to sqlite connections."""
472
 
 
473
 
    def regexp(expr, item):
474
 
        reg = re.compile(expr)
475
 
        return reg.search(six.text_type(item)) is not None
476
 
    dbapi_con.create_function('regexp', 2, regexp)
477
 
 
478
 
 
479
 
def _thread_yield(dbapi_con, con_record):
480
 
    """Ensure other greenthreads get a chance to be executed.
481
 
 
482
 
    If we use eventlet.monkey_patch(), eventlet.greenthread.sleep(0) will
483
 
    execute instead of time.sleep(0).
484
 
    Force a context switch. With common database backends (eg MySQLdb and
485
 
    sqlite), there is no implicit yield caused by network I/O since they are
486
 
    implemented by C libraries that eventlet cannot monkey patch.
487
 
    """
488
 
    time.sleep(0)
489
 
 
490
 
 
491
 
def _ping_listener(engine, dbapi_conn, connection_rec, connection_proxy):
492
 
    """Ensures that MySQL and DB2 connections are alive.
493
 
 
494
 
    Borrowed from:
495
 
    http://groups.google.com/group/sqlalchemy/msg/a4ce563d802c929f
496
 
    """
497
 
    cursor = dbapi_conn.cursor()
498
 
    try:
499
 
        ping_sql = 'select 1'
500
 
        if engine.name == 'ibm_db_sa':
501
 
            # DB2 requires a table expression
502
 
            ping_sql = 'select 1 from (values (1)) AS t1'
503
 
        cursor.execute(ping_sql)
504
 
    except Exception as ex:
505
 
        if engine.dialect.is_disconnect(ex, dbapi_conn, cursor):
506
 
            msg = _LW('Database server has gone away: %s') % ex
507
 
            LOG.warning(msg)
508
 
 
509
 
            # if the database server has gone away, all connections in the pool
510
 
            # have become invalid and we can safely close all of them here,
511
 
            # rather than waste time on checking of every single connection
512
 
            engine.dispose()
513
 
 
514
 
            # this will be handled by SQLAlchemy and will force it to create
515
 
            # a new connection and retry the original action
516
 
            raise sqla_exc.DisconnectionError(msg)
517
 
        else:
518
 
            raise
519
 
 
520
 
 
521
 
def _set_session_sql_mode(dbapi_con, connection_rec, sql_mode=None):
522
 
    """Set the sql_mode session variable.
523
 
 
524
 
    MySQL supports several server modes. The default is None, but sessions
525
 
    may choose to enable server modes like TRADITIONAL, ANSI,
526
 
    several STRICT_* modes and others.
527
 
 
528
 
    Note: passing in '' (empty string) for sql_mode clears
529
 
    the SQL mode for the session, overriding a potentially set
530
 
    server default.
531
 
    """
532
 
 
533
 
    cursor = dbapi_con.cursor()
534
 
    cursor.execute("SET SESSION sql_mode = %s", [sql_mode])
535
 
 
536
 
 
537
 
def _mysql_get_effective_sql_mode(engine):
538
 
    """Returns the effective SQL mode for connections from the engine pool.
539
 
 
540
 
    Returns ``None`` if the mode isn't available, otherwise returns the mode.
541
 
 
542
 
    """
543
 
    # Get the real effective SQL mode. Even when unset by
544
 
    # our own config, the server may still be operating in a specific
545
 
    # SQL mode as set by the server configuration.
546
 
    # Also note that the checkout listener will be called on execute to
547
 
    # set the mode if it's registered.
548
 
    row = engine.execute("SHOW VARIABLES LIKE 'sql_mode'").fetchone()
549
 
    if row is None:
550
 
        return
551
 
    return row[1]
552
 
 
553
 
 
554
 
def _mysql_check_effective_sql_mode(engine):
555
 
    """Logs a message based on the effective SQL mode for MySQL connections."""
556
 
    realmode = _mysql_get_effective_sql_mode(engine)
557
 
 
558
 
    if realmode is None:
559
 
        LOG.warning(_LW('Unable to detect effective SQL mode'))
560
 
        return
561
 
 
562
 
    LOG.debug('MySQL server mode set to %s', realmode)
563
 
    # 'TRADITIONAL' mode enables several other modes, so
564
 
    # we need a substring match here
565
 
    if not ('TRADITIONAL' in realmode.upper() or
566
 
            'STRICT_ALL_TABLES' in realmode.upper()):
567
 
        LOG.warning(_LW("MySQL SQL mode is '%s', "
568
 
                        "consider enabling TRADITIONAL or STRICT_ALL_TABLES"),
569
 
                    realmode)
570
 
 
571
 
 
572
 
def _mysql_set_mode_callback(engine, sql_mode):
573
 
    if sql_mode is not None:
574
 
        mode_callback = functools.partial(_set_session_sql_mode,
575
 
                                          sql_mode=sql_mode)
576
 
        sqlalchemy.event.listen(engine, 'connect', mode_callback)
577
 
    _mysql_check_effective_sql_mode(engine)
578
 
 
579
 
 
580
 
def _is_db_connection_error(args):
581
 
    """Return True if error in connecting to db."""
582
 
    # NOTE(adam_g): This is currently MySQL specific and needs to be extended
583
 
    #               to support Postgres and others.
584
 
    # For the db2, the error code is -30081 since the db2 is still not ready
585
 
    conn_err_codes = ('2002', '2003', '2006', '2013', '-30081')
586
 
    for err_code in conn_err_codes:
587
 
        if args.find(err_code) != -1:
588
 
            return True
589
 
    return False
590
 
 
591
 
 
592
 
def _raise_if_db_connection_lost(error, engine):
593
 
    # NOTE(vsergeyev): Function is_disconnect(e, connection, cursor)
594
 
    #                  requires connection and cursor in incoming parameters,
595
 
    #                  but we have no possibility to create connection if DB
596
 
    #                  is not available, so in such case reconnect fails.
597
 
    #                  But is_disconnect() ignores these parameters, so it
598
 
    #                  makes sense to pass to function None as placeholder
599
 
    #                  instead of connection and cursor.
600
 
    if engine.dialect.is_disconnect(error, None, None):
601
 
        raise exception.DBConnectionError(error)
602
 
 
603
 
 
604
 
def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None,
605
 
                  idle_timeout=3600,
606
 
                  connection_debug=0, max_pool_size=None, max_overflow=None,
607
 
                  pool_timeout=None, sqlite_synchronous=True,
608
 
                  connection_trace=False, max_retries=10, retry_interval=10):
609
 
    """Return a new SQLAlchemy engine."""
610
 
 
611
 
    connection_dict = sqlalchemy.engine.url.make_url(sql_connection)
612
 
 
613
 
    engine_args = {
614
 
        "pool_recycle": idle_timeout,
615
 
        'convert_unicode': True,
616
 
    }
617
 
 
618
 
    logger = logging.getLogger('sqlalchemy.engine')
619
 
 
620
 
    # Map SQL debug level to Python log level
621
 
    if connection_debug >= 100:
622
 
        logger.setLevel(logging.DEBUG)
623
 
    elif connection_debug >= 50:
624
 
        logger.setLevel(logging.INFO)
625
 
    else:
626
 
        logger.setLevel(logging.WARNING)
627
 
 
628
 
    if "sqlite" in connection_dict.drivername:
629
 
        if sqlite_fk:
630
 
            engine_args["listeners"] = [SqliteForeignKeysListener()]
631
 
        engine_args["poolclass"] = NullPool
632
 
 
633
 
        if sql_connection == "sqlite://":
634
 
            engine_args["poolclass"] = StaticPool
635
 
            engine_args["connect_args"] = {'check_same_thread': False}
636
 
    else:
637
 
        if max_pool_size is not None:
638
 
            engine_args['pool_size'] = max_pool_size
639
 
        if max_overflow is not None:
640
 
            engine_args['max_overflow'] = max_overflow
641
 
        if pool_timeout is not None:
642
 
            engine_args['pool_timeout'] = pool_timeout
643
 
 
644
 
    engine = sqlalchemy.create_engine(sql_connection, **engine_args)
645
 
 
646
 
    sqlalchemy.event.listen(engine, 'checkin', _thread_yield)
647
 
 
648
 
    if engine.name in ['mysql', 'ibm_db_sa']:
649
 
        ping_callback = functools.partial(_ping_listener, engine)
650
 
        sqlalchemy.event.listen(engine, 'checkout', ping_callback)
651
 
        if engine.name == 'mysql':
652
 
            if mysql_sql_mode:
653
 
                _mysql_set_mode_callback(engine, mysql_sql_mode)
654
 
    elif 'sqlite' in connection_dict.drivername:
655
 
        if not sqlite_synchronous:
656
 
            sqlalchemy.event.listen(engine, 'connect',
657
 
                                    _synchronous_switch_listener)
658
 
        sqlalchemy.event.listen(engine, 'connect', _add_regexp_listener)
659
 
 
660
 
    if connection_trace and engine.dialect.dbapi.__name__ == 'MySQLdb':
661
 
        _patch_mysqldb_with_stacktrace_comments()
662
 
 
663
 
    try:
664
 
        engine.connect()
665
 
    except sqla_exc.OperationalError as e:
666
 
        if not _is_db_connection_error(e.args[0]):
667
 
            raise
668
 
 
669
 
        remaining = max_retries
670
 
        if remaining == -1:
671
 
            remaining = 'infinite'
672
 
        while True:
673
 
            msg = _LW('SQL connection failed. %s attempts left.')
674
 
            LOG.warning(msg % remaining)
675
 
            if remaining != 'infinite':
676
 
                remaining -= 1
677
 
            time.sleep(retry_interval)
678
 
            try:
679
 
                engine.connect()
680
 
                break
681
 
            except sqla_exc.OperationalError as e:
682
 
                if (remaining != 'infinite' and remaining == 0) or \
683
 
                        not _is_db_connection_error(e.args[0]):
684
 
                    raise
685
 
    return engine
686
 
 
687
 
 
688
 
class Query(sqlalchemy.orm.query.Query):
689
 
    """Subclass of sqlalchemy.query with soft_delete() method."""
690
 
    def soft_delete(self, synchronize_session='evaluate'):
691
 
        return self.update({'deleted': literal_column('id'),
692
 
                            'updated_at': literal_column('updated_at'),
693
 
                            'deleted_at': timeutils.utcnow()},
694
 
                           synchronize_session=synchronize_session)
695
 
 
696
 
 
697
 
class Session(sqlalchemy.orm.session.Session):
698
 
    """Custom Session class to avoid SqlAlchemy Session monkey patching."""
699
 
    @_wrap_db_error
700
 
    def query(self, *args, **kwargs):
701
 
        return super(Session, self).query(*args, **kwargs)
702
 
 
703
 
    @_wrap_db_error
704
 
    def flush(self, *args, **kwargs):
705
 
        return super(Session, self).flush(*args, **kwargs)
706
 
 
707
 
    @_wrap_db_error
708
 
    def execute(self, *args, **kwargs):
709
 
        return super(Session, self).execute(*args, **kwargs)
710
 
 
711
 
 
712
 
def get_maker(engine, autocommit=True, expire_on_commit=False):
713
 
    """Return a SQLAlchemy sessionmaker using the given engine."""
714
 
    return sqlalchemy.orm.sessionmaker(bind=engine,
715
 
                                       class_=Session,
716
 
                                       autocommit=autocommit,
717
 
                                       expire_on_commit=expire_on_commit,
718
 
                                       query_cls=Query)
719
 
 
720
 
 
721
 
def _patch_mysqldb_with_stacktrace_comments():
722
 
    """Adds current stack trace as a comment in queries.
723
 
 
724
 
    Patches MySQLdb.cursors.BaseCursor._do_query.
725
 
    """
726
 
    import MySQLdb.cursors
727
 
    import traceback
728
 
 
729
 
    old_mysql_do_query = MySQLdb.cursors.BaseCursor._do_query
730
 
 
731
 
    def _do_query(self, q):
732
 
        stack = ''
733
 
        for filename, line, method, function in traceback.extract_stack():
734
 
            # exclude various common things from trace
735
 
            if filename.endswith('session.py') and method == '_do_query':
736
 
                continue
737
 
            if filename.endswith('api.py') and method == 'wrapper':
738
 
                continue
739
 
            if filename.endswith('utils.py') and method == '_inner':
740
 
                continue
741
 
            if filename.endswith('exception.py') and method == '_wrap':
742
 
                continue
743
 
            # db/api is just a wrapper around db/sqlalchemy/api
744
 
            if filename.endswith('db/api.py'):
745
 
                continue
746
 
            # only trace inside ceilometer
747
 
            index = filename.rfind('ceilometer')
748
 
            if index == -1:
749
 
                continue
750
 
            stack += "File:%s:%s Method:%s() Line:%s | " \
751
 
                     % (filename[index:], line, method, function)
752
 
 
753
 
        # strip trailing " | " from stack
754
 
        if stack:
755
 
            stack = stack[:-3]
756
 
            qq = "%s /* %s */" % (q, stack)
757
 
        else:
758
 
            qq = q
759
 
        old_mysql_do_query(self, qq)
760
 
 
761
 
    setattr(MySQLdb.cursors.BaseCursor, '_do_query', _do_query)
762
 
 
763
 
 
764
 
class EngineFacade(object):
765
 
    """A helper class for removing of global engine instances from ceilometer.db.
766
 
 
767
 
    As a library, ceilometer.db can't decide where to store/when to create engine
768
 
    and sessionmaker instances, so this must be left for a target application.
769
 
 
770
 
    On the other hand, in order to simplify the adoption of ceilometer.db changes,
771
 
    we'll provide a helper class, which creates engine and sessionmaker
772
 
    on its instantiation and provides get_engine()/get_session() methods
773
 
    that are compatible with corresponding utility functions that currently
774
 
    exist in target projects, e.g. in Nova.
775
 
 
776
 
    engine/sessionmaker instances will still be global (and they are meant to
777
 
    be global), but they will be stored in the app context, rather that in the
778
 
    ceilometer.db context.
779
 
 
780
 
    Note: using of this helper is completely optional and you are encouraged to
781
 
    integrate engine/sessionmaker instances into your apps any way you like
782
 
    (e.g. one might want to bind a session to a request context). Two important
783
 
    things to remember:
784
 
 
785
 
    1. An Engine instance is effectively a pool of DB connections, so it's
786
 
       meant to be shared (and it's thread-safe).
787
 
    2. A Session instance is not meant to be shared and represents a DB
788
 
       transactional context (i.e. it's not thread-safe). sessionmaker is
789
 
       a factory of sessions.
790
 
 
791
 
    """
792
 
 
793
 
    def __init__(self, sql_connection,
794
 
                 sqlite_fk=False, autocommit=True,
795
 
                 expire_on_commit=False, **kwargs):
796
 
        """Initialize engine and sessionmaker instances.
797
 
 
798
 
        :param sqlite_fk: enable foreign keys in SQLite
799
 
        :type sqlite_fk: bool
800
 
 
801
 
        :param autocommit: use autocommit mode for created Session instances
802
 
        :type autocommit: bool
803
 
 
804
 
        :param expire_on_commit: expire session objects on commit
805
 
        :type expire_on_commit: bool
806
 
 
807
 
        Keyword arguments:
808
 
 
809
 
        :keyword mysql_sql_mode: the SQL mode to be used for MySQL sessions.
810
 
                                 (defaults to TRADITIONAL)
811
 
        :keyword idle_timeout: timeout before idle sql connections are reaped
812
 
                               (defaults to 3600)
813
 
        :keyword connection_debug: verbosity of SQL debugging information.
814
 
                                   0=None, 100=Everything (defaults to 0)
815
 
        :keyword max_pool_size: maximum number of SQL connections to keep open
816
 
                                in a pool (defaults to SQLAlchemy settings)
817
 
        :keyword max_overflow: if set, use this value for max_overflow with
818
 
                               sqlalchemy (defaults to SQLAlchemy settings)
819
 
        :keyword pool_timeout: if set, use this value for pool_timeout with
820
 
                               sqlalchemy (defaults to SQLAlchemy settings)
821
 
        :keyword sqlite_synchronous: if True, SQLite uses synchronous mode
822
 
                                     (defaults to True)
823
 
        :keyword connection_trace: add python stack traces to SQL as comment
824
 
                                   strings (defaults to False)
825
 
        :keyword max_retries: maximum db connection retries during startup.
826
 
                              (setting -1 implies an infinite retry count)
827
 
                              (defaults to 10)
828
 
        :keyword retry_interval: interval between retries of opening a sql
829
 
                                 connection (defaults to 10)
830
 
 
831
 
        """
832
 
 
833
 
        super(EngineFacade, self).__init__()
834
 
 
835
 
        self._engine = create_engine(
836
 
            sql_connection=sql_connection,
837
 
            sqlite_fk=sqlite_fk,
838
 
            mysql_sql_mode=kwargs.get('mysql_sql_mode', 'TRADITIONAL'),
839
 
            idle_timeout=kwargs.get('idle_timeout', 3600),
840
 
            connection_debug=kwargs.get('connection_debug', 0),
841
 
            max_pool_size=kwargs.get('max_pool_size'),
842
 
            max_overflow=kwargs.get('max_overflow'),
843
 
            pool_timeout=kwargs.get('pool_timeout'),
844
 
            sqlite_synchronous=kwargs.get('sqlite_synchronous', True),
845
 
            connection_trace=kwargs.get('connection_trace', False),
846
 
            max_retries=kwargs.get('max_retries', 10),
847
 
            retry_interval=kwargs.get('retry_interval', 10))
848
 
        self._session_maker = get_maker(
849
 
            engine=self._engine,
850
 
            autocommit=autocommit,
851
 
            expire_on_commit=expire_on_commit)
852
 
 
853
 
    def get_engine(self):
854
 
        """Get the engine instance (note, that it's shared)."""
855
 
 
856
 
        return self._engine
857
 
 
858
 
    def get_session(self, **kwargs):
859
 
        """Get a Session instance.
860
 
 
861
 
        If passed, keyword arguments values override the ones used when the
862
 
        sessionmaker instance was created.
863
 
 
864
 
        :keyword autocommit: use autocommit mode for created Session instances
865
 
        :type autocommit: bool
866
 
 
867
 
        :keyword expire_on_commit: expire session objects on commit
868
 
        :type expire_on_commit: bool
869
 
 
870
 
        """
871
 
 
872
 
        for arg in kwargs:
873
 
            if arg not in ('autocommit', 'expire_on_commit'):
874
 
                del kwargs[arg]
875
 
 
876
 
        return self._session_maker(**kwargs)
877
 
 
878
 
    @classmethod
879
 
    def from_config(cls, connection_string, conf,
880
 
                    sqlite_fk=False, autocommit=True, expire_on_commit=False):
881
 
        """Initialize EngineFacade using oslo.config config instance options.
882
 
 
883
 
        :param connection_string: SQLAlchemy connection string
884
 
        :type connection_string: string
885
 
 
886
 
        :param conf: oslo.config config instance
887
 
        :type conf: oslo.config.cfg.ConfigOpts
888
 
 
889
 
        :param sqlite_fk: enable foreign keys in SQLite
890
 
        :type sqlite_fk: bool
891
 
 
892
 
        :param autocommit: use autocommit mode for created Session instances
893
 
        :type autocommit: bool
894
 
 
895
 
        :param expire_on_commit: expire session objects on commit
896
 
        :type expire_on_commit: bool
897
 
 
898
 
        """
899
 
 
900
 
        return cls(sql_connection=connection_string,
901
 
                   sqlite_fk=sqlite_fk,
902
 
                   autocommit=autocommit,
903
 
                   expire_on_commit=expire_on_commit,
904
 
                   **dict(conf.database.items()))