~ubuntu-branches/ubuntu/trusty/ceilometer/trusty-proposed

« back to all changes in this revision

Viewing changes to ceilometer/openstack/common/db/sqlalchemy/session.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short, James Page, Chuck Short
  • Date: 2014-01-23 15:08:11 UTC
  • mfrom: (1.1.11)
  • Revision ID: package-import@ubuntu.com-20140123150811-1zaismsuyh1hcl8y
Tags: 2014.1~b2-0ubuntu1
[ James Page ]
* d/control: Add python-jsonpath-rw to BD's.
* d/p/fix-setup-requirements.patch: Bump WebOb to support < 1.4.
 (LP: #1261101)

[ Chuck Short ]
* New upstream version.
* debian/control, debian/ceilometer-common.install: Split out
  ceilometer-alarm-evaluator and ceilometer-alarm-notifier into their
  own packages. (LP: #1250002)
* debian/ceilometer-agent-central.logrotate,
  debian/ceilometer-agent-compute.logrotate,
  debian/ceilometer-api.logrotate,
  debian/ceilometer-collector.logrotate: Add logrotate files, 
  thanks to Ahmed Rahal. (LP: #1224223)
* Fix typos in upstart files.

Show diffs side-by-side

added added

removed removed

Lines of Context:
21
21
* Call set_defaults with the minimal of the following kwargs:
22
22
    sql_connection, sqlite_db
23
23
 
24
 
  Example:
 
24
  Example::
25
25
 
26
26
    session.set_defaults(
27
27
        sql_connection="sqlite:///var/lib/ceilometer/sqlite.db",
42
42
  functionality should be handled at a logical level. For an example, look at
43
43
  the code around quotas and reservation_rollback().
44
44
 
45
 
  Examples:
 
45
  Examples::
46
46
 
47
47
    def get_foo(context, foo):
48
 
        return model_query(context, models.Foo).\
49
 
                filter_by(foo=foo).\
50
 
                first()
 
48
        return (model_query(context, models.Foo).
 
49
                filter_by(foo=foo).
 
50
                first())
51
51
 
52
52
    def update_foo(context, id, newfoo):
53
 
        model_query(context, models.Foo).\
54
 
                filter_by(id=id).\
55
 
                update({'foo': newfoo})
 
53
        (model_query(context, models.Foo).
 
54
                filter_by(id=id).
 
55
                update({'foo': newfoo}))
56
56
 
57
57
    def create_foo(context, values):
58
58
        foo_ref = models.Foo()
66
66
  handler will take care of calling flush() and commit() for you.
67
67
  If using this approach, you should not explicitly call flush() or commit().
68
68
  Any error within the context of the session will cause the session to emit
69
 
  a ROLLBACK. If the connection is dropped before this is possible, the
70
 
  database will implicitly rollback the transaction.
 
69
  a ROLLBACK. Database Errors like IntegrityError will be raised in
 
70
  session's __exit__ handler, and any try/except within the context managed
 
71
  by session will not be triggered. And catching other non-database errors in
 
72
  the session will not trigger the ROLLBACK, so exception handlers should
 
73
  always be outside the session, unless the developer wants to do a partial
 
74
  commit on purpose. If the connection is dropped before this is possible,
 
75
  the database will implicitly roll back the transaction.
71
76
 
72
77
     Note: statements in the session scope will not be automatically retried.
73
78
 
74
79
  If you create models within the session, they need to be added, but you
75
80
  do not need to call model.save()
76
81
 
 
82
  ::
 
83
 
77
84
    def create_many_foo(context, foos):
78
85
        session = get_session()
79
86
        with session.begin():
85
92
    def update_bar(context, foo_id, newbar):
86
93
        session = get_session()
87
94
        with session.begin():
88
 
            foo_ref = model_query(context, models.Foo, session).\
89
 
                        filter_by(id=foo_id).\
90
 
                        first()
91
 
            model_query(context, models.Bar, session).\
92
 
                        filter_by(id=foo_ref['bar_id']).\
93
 
                        update({'bar': newbar})
 
95
            foo_ref = (model_query(context, models.Foo, session).
 
96
                        filter_by(id=foo_id).
 
97
                        first())
 
98
            (model_query(context, models.Bar, session).
 
99
                        filter_by(id=foo_ref['bar_id']).
 
100
                        update({'bar': newbar}))
94
101
 
95
102
  Note: update_bar is a trivially simple example of using "with session.begin".
96
103
  Whereas create_many_foo is a good example of when a transaction is needed,
97
104
  it is always best to use as few queries as possible. The two queries in
98
105
  update_bar can be better expressed using a single query which avoids
99
 
  the need for an explicit transaction. It can be expressed like so:
 
106
  the need for an explicit transaction. It can be expressed like so::
100
107
 
101
108
    def update_bar(context, foo_id, newbar):
102
 
        subq = model_query(context, models.Foo.id).\
103
 
                filter_by(id=foo_id).\
104
 
                limit(1).\
105
 
                subquery()
106
 
        model_query(context, models.Bar).\
107
 
                filter_by(id=subq.as_scalar()).\
108
 
                update({'bar': newbar})
 
109
        subq = (model_query(context, models.Foo.id).
 
110
                filter_by(id=foo_id).
 
111
                limit(1).
 
112
                subquery())
 
113
        (model_query(context, models.Bar).
 
114
                filter_by(id=subq.as_scalar()).
 
115
                update({'bar': newbar}))
109
116
 
110
 
  For reference, this emits approximagely the following SQL statement:
 
117
  For reference, this emits approximately the following SQL statement::
111
118
 
112
119
    UPDATE bar SET bar = ${newbar}
113
120
        WHERE id=(SELECT bar_id FROM foo WHERE id = ${foo_id} LIMIT 1);
114
121
 
 
122
  Note: create_duplicate_foo is a trivially simple example of catching an
 
123
  exception while using "with session.begin". Here create two duplicate
 
124
  instances with same primary key, must catch the exception out of context
 
125
  managed by a single session:
 
126
 
 
127
    def create_duplicate_foo(context):
 
128
        foo1 = models.Foo()
 
129
        foo2 = models.Foo()
 
130
        foo1.id = foo2.id = 1
 
131
        session = get_session()
 
132
        try:
 
133
            with session.begin():
 
134
                session.add(foo1)
 
135
                session.add(foo2)
 
136
        except exception.DBDuplicateEntry as e:
 
137
            handle_error(e)
 
138
 
115
139
* Passing an active session between methods. Sessions should only be passed
116
140
  to private methods. The private method must use a subtransaction; otherwise
117
141
  SQLAlchemy will throw an error when you call session.begin() on an existing
127
151
  becomes less clear in this situation. When this is needed for code clarity,
128
152
  it should be clearly documented.
129
153
 
 
154
  ::
 
155
 
130
156
    def myfunc(foo):
131
157
        session = get_session()
132
158
        with session.begin():
171
197
Enabling soft deletes:
172
198
 
173
199
* To use/enable soft-deletes, the SoftDeleteMixin must be added
174
 
  to your model class. For example:
 
200
  to your model class. For example::
175
201
 
176
202
      class NovaBase(models.SoftDeleteMixin, models.ModelBase):
177
203
          pass
179
205
 
180
206
Efficient use of soft deletes:
181
207
 
182
 
* There are two possible ways to mark a record as deleted:
 
208
* There are two possible ways to mark a record as deleted::
 
209
 
183
210
    model.soft_delete() and query.soft_delete().
184
211
 
185
212
  model.soft_delete() method works with single already fetched entry.
186
213
  query.soft_delete() makes only one db request for all entries that correspond
187
214
  to query.
188
215
 
189
 
* In almost all cases you should use query.soft_delete(). Some examples:
 
216
* In almost all cases you should use query.soft_delete(). Some examples::
190
217
 
191
218
        def soft_delete_bar():
192
219
            count = model_query(BarModel).find(some_condition).soft_delete()
197
224
            if session is None:
198
225
                session = get_session()
199
226
            with session.begin(subtransactions=True):
200
 
                count = model_query(BarModel).\
201
 
                            find(some_condition).\
202
 
                            soft_delete(synchronize_session=True)
 
227
                count = (model_query(BarModel).
 
228
                            find(some_condition).
 
229
                            soft_delete(synchronize_session=True))
203
230
                            # Here synchronize_session is required, because we
204
231
                            # don't know what is going on in outer session.
205
232
                if count == 0:
209
236
  you fetch a single record, work with it, and mark it as deleted in the same
210
237
  transaction.
211
238
 
 
239
  ::
 
240
 
212
241
        def soft_delete_bar_model():
213
242
            session = get_session()
214
243
            with session.begin():
217
246
                bar_ref.soft_delete(session=session)
218
247
 
219
248
  However, if you need to work with all entries that correspond to query and
220
 
  then soft delete them you should use query.soft_delete() method:
 
249
  then soft delete them you should use query.soft_delete() method::
221
250
 
222
251
        def soft_delete_multi_models():
223
252
            session = get_session()
224
253
            with session.begin():
225
 
                query = model_query(BarModel, session=session).\
226
 
                            find(some_condition)
 
254
                query = (model_query(BarModel, session=session).
 
255
                            find(some_condition))
227
256
                model_refs = query.all()
228
257
                # Work with model_refs
229
258
                query.soft_delete(synchronize_session=False)
234
263
  which issues a single query. Using model.soft_delete(), as in the following
235
264
  example, is very inefficient.
236
265
 
 
266
  ::
 
267
 
237
268
        for bar_ref in bar_refs:
238
269
            bar_ref.soft_delete(session=session)
239
270
        # This will produce count(bar_refs) db requests.
247
278
from oslo.config import cfg
248
279
import six
249
280
from sqlalchemy import exc as sqla_exc
250
 
import sqlalchemy.interfaces
251
281
from sqlalchemy.interfaces import PoolListener
252
282
import sqlalchemy.orm
253
283
from sqlalchemy.pool import NullPool, StaticPool
254
284
from sqlalchemy.sql.expression import literal_column
255
285
 
256
286
from ceilometer.openstack.common.db import exception
257
 
from ceilometer.openstack.common.gettextutils import _  # noqa
 
287
from ceilometer.openstack.common.gettextutils import _
258
288
from ceilometer.openstack.common import log as logging
259
289
from ceilometer.openstack.common import timeutils
260
290
 
274
304
                       '../', '$sqlite_db')),
275
305
               help='The SQLAlchemy connection string used to connect to the '
276
306
                    'database',
 
307
               secret=True,
277
308
               deprecated_opts=[cfg.DeprecatedOpt('sql_connection',
278
309
                                                  group='DEFAULT'),
279
310
                                cfg.DeprecatedOpt('sql_connection',
282
313
                                                  group='sql'), ]),
283
314
    cfg.StrOpt('slave_connection',
284
315
               default='',
 
316
               secret=True,
285
317
               help='The SQLAlchemy connection string used to connect to the '
286
318
                    'slave database'),
287
319
    cfg.IntOpt('idle_timeout',
289
321
               deprecated_opts=[cfg.DeprecatedOpt('sql_idle_timeout',
290
322
                                                  group='DEFAULT'),
291
323
                                cfg.DeprecatedOpt('sql_idle_timeout',
292
 
                                                  group='DATABASE')],
 
324
                                                  group='DATABASE'),
 
325
                                cfg.DeprecatedOpt('idle_timeout',
 
326
                                                  group='sql')],
293
327
               help='timeout before idle sql connections are reaped'),
294
328
    cfg.IntOpt('min_pool_size',
295
329
               default=1,
407
441
        dbapi_con.execute('pragma foreign_keys=ON')
408
442
 
409
443
 
410
 
def get_session(autocommit=True, expire_on_commit=False,
411
 
                sqlite_fk=False, slave_session=False):
 
444
def get_session(autocommit=True, expire_on_commit=False, sqlite_fk=False,
 
445
                slave_session=False, mysql_traditional_mode=False):
412
446
    """Return a SQLAlchemy session."""
413
447
    global _MAKER
414
448
    global _SLAVE_MAKER
418
452
        maker = _SLAVE_MAKER
419
453
 
420
454
    if maker is None:
421
 
        engine = get_engine(sqlite_fk=sqlite_fk, slave_engine=slave_session)
 
455
        engine = get_engine(sqlite_fk=sqlite_fk, slave_engine=slave_session,
 
456
                            mysql_traditional_mode=mysql_traditional_mode)
422
457
        maker = get_maker(engine, autocommit, expire_on_commit)
423
458
 
424
459
    if slave_session:
437
472
# 1 column - (IntegrityError) column c1 is not unique
438
473
# N columns - (IntegrityError) column c1, c2, ..., N are not unique
439
474
#
 
475
# sqlite since 3.7.16:
 
476
# 1 column - (IntegrityError) UNIQUE constraint failed: k1
 
477
#
 
478
# N columns - (IntegrityError) UNIQUE constraint failed: k1, k2
 
479
#
440
480
# postgres:
441
481
# 1 column - (IntegrityError) duplicate key value violates unique
442
482
#               constraint "users_c1_key"
449
489
# N columns - (IntegrityError) (1062, "Duplicate entry 'values joined
450
490
#               with -' for key 'name_of_our_constraint'")
451
491
_DUP_KEY_RE_DB = {
452
 
    "sqlite": re.compile(r"^.*columns?([^)]+)(is|are)\s+not\s+unique$"),
453
 
    "postgresql": re.compile(r"^.*duplicate\s+key.*\"([^\"]+)\"\s*\n.*$"),
454
 
    "mysql": re.compile(r"^.*\(1062,.*'([^\']+)'\"\)$")
 
492
    "sqlite": (re.compile(r"^.*columns?([^)]+)(is|are)\s+not\s+unique$"),
 
493
               re.compile(r"^.*UNIQUE\s+constraint\s+failed:\s+(.+)$")),
 
494
    "postgresql": (re.compile(r"^.*duplicate\s+key.*\"([^\"]+)\"\s*\n.*$"),),
 
495
    "mysql": (re.compile(r"^.*\(1062,.*'([^\']+)'\"\)$"),)
455
496
}
456
497
 
457
498
 
481
522
    # SQLAlchemy can differ when using unicode() and accessing .message.
482
523
    # An audit across all three supported engines will be necessary to
483
524
    # ensure there are no regressions.
484
 
    m = _DUP_KEY_RE_DB[engine_name].match(integrity_error.message)
485
 
    if not m:
 
525
    for pattern in _DUP_KEY_RE_DB[engine_name]:
 
526
        match = pattern.match(integrity_error.message)
 
527
        if match:
 
528
            break
 
529
    else:
486
530
        return
487
 
    columns = m.group(1)
 
531
 
 
532
    columns = match.group(1)
488
533
 
489
534
    if engine_name == "sqlite":
490
535
        columns = columns.strip().split(", ")
553
598
    return _wrap
554
599
 
555
600
 
556
 
def get_engine(sqlite_fk=False, slave_engine=False):
 
601
def get_engine(sqlite_fk=False, slave_engine=False,
 
602
               mysql_traditional_mode=False):
557
603
    """Return a SQLAlchemy engine."""
558
604
    global _ENGINE
559
605
    global _SLAVE_ENGINE
565
611
        db_uri = CONF.database.slave_connection
566
612
 
567
613
    if engine is None:
568
 
        engine = create_engine(db_uri,
569
 
                               sqlite_fk=sqlite_fk)
 
614
        engine = create_engine(db_uri, sqlite_fk=sqlite_fk,
 
615
                               mysql_traditional_mode=mysql_traditional_mode)
570
616
    if slave_engine:
571
617
        _SLAVE_ENGINE = engine
572
618
    else:
601
647
    time.sleep(0)
602
648
 
603
649
 
604
 
def _ping_listener(dbapi_conn, connection_rec, connection_proxy):
605
 
    """Ensures that MySQL connections checked out of the pool are alive.
 
650
def _ping_listener(engine, dbapi_conn, connection_rec, connection_proxy):
 
651
    """Ensures that MySQL and DB2 connections are alive.
606
652
 
607
653
    Borrowed from:
608
654
    http://groups.google.com/group/sqlalchemy/msg/a4ce563d802c929f
609
655
    """
 
656
    cursor = dbapi_conn.cursor()
610
657
    try:
611
 
        dbapi_conn.cursor().execute('select 1')
612
 
    except dbapi_conn.OperationalError as ex:
613
 
        if ex.args[0] in (2006, 2013, 2014, 2045, 2055):
614
 
            LOG.warn(_('Got mysql server has gone away: %s'), ex)
615
 
            raise sqla_exc.DisconnectionError("Database server went away")
 
658
        ping_sql = 'select 1'
 
659
        if engine.name == 'ibm_db_sa':
 
660
            # DB2 requires a table expression
 
661
            ping_sql = 'select 1 from (values (1)) AS t1'
 
662
        cursor.execute(ping_sql)
 
663
    except Exception as ex:
 
664
        if engine.dialect.is_disconnect(ex, dbapi_conn, cursor):
 
665
            msg = _('Database server has gone away: %s') % ex
 
666
            LOG.warning(msg)
 
667
            raise sqla_exc.DisconnectionError(msg)
616
668
        else:
617
669
            raise
618
670
 
619
671
 
 
672
def _set_mode_traditional(dbapi_con, connection_rec, connection_proxy):
 
673
    """Set engine mode to 'traditional'.
 
674
 
 
675
    Required to prevent silent truncates at insert or update operations
 
676
    under MySQL. By default MySQL truncates inserted string if it longer
 
677
    than a declared field just with warning. That is fraught with data
 
678
    corruption.
 
679
    """
 
680
    dbapi_con.cursor().execute("SET SESSION sql_mode = TRADITIONAL;")
 
681
 
 
682
 
620
683
def _is_db_connection_error(args):
621
684
    """Return True if error in connecting to db."""
622
685
    # NOTE(adam_g): This is currently MySQL specific and needs to be extended
629
692
    return False
630
693
 
631
694
 
632
 
def create_engine(sql_connection, sqlite_fk=False):
 
695
def create_engine(sql_connection, sqlite_fk=False,
 
696
                  mysql_traditional_mode=False):
633
697
    """Return a new SQLAlchemy engine."""
634
698
    # NOTE(geekinutah): At this point we could be connecting to the normal
635
699
    #                   db handle or the slave db handle. Things like
670
734
 
671
735
    sqlalchemy.event.listen(engine, 'checkin', _thread_yield)
672
736
 
673
 
    if 'mysql' in connection_dict.drivername:
674
 
        sqlalchemy.event.listen(engine, 'checkout', _ping_listener)
 
737
    if engine.name in ['mysql', 'ibm_db_sa']:
 
738
        callback = functools.partial(_ping_listener, engine)
 
739
        sqlalchemy.event.listen(engine, 'checkout', callback)
 
740
        if mysql_traditional_mode:
 
741
            sqlalchemy.event.listen(engine, 'checkout', _set_mode_traditional)
 
742
        else:
 
743
            LOG.warning(_("This application has not enabled MySQL traditional"
 
744
                          " mode, which means silent data corruption may"
 
745
                          " occur. Please encourage the application"
 
746
                          " developers to enable this mode."))
675
747
    elif 'sqlite' in connection_dict.drivername:
676
748
        if not CONF.sqlite_synchronous:
677
749
            sqlalchemy.event.listen(engine, 'connect',
693
765
            remaining = 'infinite'
694
766
        while True:
695
767
            msg = _('SQL connection failed. %s attempts left.')
696
 
            LOG.warn(msg % remaining)
 
768
            LOG.warning(msg % remaining)
697
769
            if remaining != 'infinite':
698
770
                remaining -= 1
699
771
            time.sleep(CONF.database.retry_interval)