~justin-fathomdb/nova/justinsb-openstack-api-volumes

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/enterprise/adbapi.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- test-case-name: twisted.test.test_adbapi -*-
 
2
# Copyright (c) 2001-2008 Twisted Matrix Laboratories.
 
3
# See LICENSE for details.
 
4
 
 
5
"""
 
6
An asynchronous mapping to U{DB-API 2.0<http://www.python.org/topics/database/DatabaseAPI-2.0.html>}.
 
7
"""
 
8
 
 
9
import sys
 
10
 
 
11
from twisted.internet import threads
 
12
from twisted.python import reflect, log
 
13
from twisted.python.deprecate import deprecated
 
14
from twisted.python.versions import Version
 
15
 
 
16
 
 
17
 
 
18
class ConnectionLost(Exception):
 
19
    """
 
20
    This exception means that a db connection has been lost.  Client code may
 
21
    try again.
 
22
    """
 
23
 
 
24
 
 
25
 
 
26
class Connection(object):
 
27
    """
 
28
    A wrapper for a DB-API connection instance.
 
29
 
 
30
    The wrapper passes almost everything to the wrapped connection and so has
 
31
    the same API. However, the Connection knows about its pool and also
 
32
    handle reconnecting should when the real connection dies.
 
33
    """
 
34
 
 
35
    def __init__(self, pool):
 
36
        self._pool = pool
 
37
        self._connection = None
 
38
        self.reconnect()
 
39
 
 
40
    def close(self):
 
41
        # The way adbapi works right now means that closing a connection is
 
42
        # a really bad thing  as it leaves a dead connection associated with
 
43
        # a thread in the thread pool.
 
44
        # Really, I think closing a pooled connection should return it to the
 
45
        # pool but that's handled by the runWithConnection method already so,
 
46
        # rather than upsetting anyone by raising an exception, let's ignore
 
47
        # the request
 
48
        pass
 
49
 
 
50
    def rollback(self):
 
51
        if not self._pool.reconnect:
 
52
            self._connection.rollback()
 
53
            return
 
54
 
 
55
        try:
 
56
            self._connection.rollback()
 
57
            curs = self._connection.cursor()
 
58
            curs.execute(self._pool.good_sql)
 
59
            curs.close()
 
60
            self._connection.commit()
 
61
            return
 
62
        except:
 
63
            log.err(None, "Rollback failed")
 
64
 
 
65
        self._pool.disconnect(self._connection)
 
66
 
 
67
        if self._pool.noisy:
 
68
            log.msg("Connection lost.")
 
69
 
 
70
        raise ConnectionLost()
 
71
 
 
72
    def reconnect(self):
 
73
        if self._connection is not None:
 
74
            self._pool.disconnect(self._connection)
 
75
        self._connection = self._pool.connect()
 
76
 
 
77
    def __getattr__(self, name):
 
78
        return getattr(self._connection, name)
 
79
 
 
80
 
 
81
class Transaction:
 
82
    """A lightweight wrapper for a DB-API 'cursor' object.
 
83
 
 
84
    Relays attribute access to the DB cursor. That is, you can call
 
85
    execute(), fetchall(), etc., and they will be called on the
 
86
    underlying DB-API cursor object. Attributes will also be
 
87
    retrieved from there.
 
88
    """
 
89
    _cursor = None
 
90
 
 
91
    def __init__(self, pool, connection):
 
92
        self._pool = pool
 
93
        self._connection = connection
 
94
        self.reopen()
 
95
 
 
96
    def close(self):
 
97
        _cursor = self._cursor
 
98
        self._cursor = None
 
99
        _cursor.close()
 
100
 
 
101
    def reopen(self):
 
102
        if self._cursor is not None:
 
103
            self.close()
 
104
 
 
105
        try:
 
106
            self._cursor = self._connection.cursor()
 
107
            return
 
108
        except:
 
109
            if not self._pool.reconnect:
 
110
                raise
 
111
            else:
 
112
                log.err(None, "Cursor creation failed")
 
113
 
 
114
        if self._pool.noisy:
 
115
            log.msg('Connection lost, reconnecting')
 
116
 
 
117
        self.reconnect()
 
118
        self._cursor = self._connection.cursor()
 
119
 
 
120
    def reconnect(self):
 
121
        self._connection.reconnect()
 
122
        self._cursor = None
 
123
 
 
124
    def __getattr__(self, name):
 
125
        return getattr(self._cursor, name)
 
126
 
 
127
 
 
128
class ConnectionPool:
 
129
    """
 
130
    Represent a pool of connections to a DB-API 2.0 compliant database.
 
131
 
 
132
    @ivar connectionFactory: factory for connections, default to L{Connection}.
 
133
    @type connectionFactory: any callable.
 
134
 
 
135
    @ivar transactionFactory: factory for transactions, default to
 
136
        L{Transaction}.
 
137
    @type transactionFactory: any callable
 
138
    """
 
139
 
 
140
    CP_ARGS = "min max name noisy openfun reconnect good_sql".split()
 
141
 
 
142
    noisy = False # if true, generate informational log messages
 
143
    min = 3 # minimum number of connections in pool
 
144
    max = 5 # maximum number of connections in pool
 
145
    name = None # Name to assign to thread pool for debugging
 
146
    openfun = None # A function to call on new connections
 
147
    reconnect = False # reconnect when connections fail
 
148
    good_sql = 'select 1' # a query which should always succeed
 
149
 
 
150
    running = False # true when the pool is operating
 
151
    connectionFactory = Connection
 
152
    transactionFactory = Transaction
 
153
 
 
154
    def __init__(self, dbapiName, *connargs, **connkw):
 
155
        """Create a new ConnectionPool.
 
156
 
 
157
        Any positional or keyword arguments other than those documented here
 
158
        are passed to the DB-API object when connecting. Use these arguments to
 
159
        pass database names, usernames, passwords, etc.
 
160
 
 
161
        @param dbapiName: an import string to use to obtain a DB-API compatible
 
162
                          module (e.g. 'pyPgSQL.PgSQL')
 
163
 
 
164
        @param cp_min: the minimum number of connections in pool (default 3)
 
165
 
 
166
        @param cp_max: the maximum number of connections in pool (default 5)
 
167
 
 
168
        @param cp_noisy: generate informational log messages during operation
 
169
                         (default False)
 
170
 
 
171
        @param cp_openfun: a callback invoked after every connect() on the
 
172
                           underlying DB-API object. The callback is passed a
 
173
                           new DB-API connection object.  This callback can
 
174
                           setup per-connection state such as charset,
 
175
                           timezone, etc.
 
176
 
 
177
        @param cp_reconnect: detect connections which have failed and reconnect
 
178
                             (default False). Failed connections may result in
 
179
                             ConnectionLost exceptions, which indicate the
 
180
                             query may need to be re-sent.
 
181
 
 
182
        @param cp_good_sql: an sql query which should always succeed and change
 
183
                            no state (default 'select 1')
 
184
        """
 
185
 
 
186
        self.dbapiName = dbapiName
 
187
        self.dbapi = reflect.namedModule(dbapiName)
 
188
 
 
189
        if getattr(self.dbapi, 'apilevel', None) != '2.0':
 
190
            log.msg('DB API module not DB API 2.0 compliant.')
 
191
 
 
192
        if getattr(self.dbapi, 'threadsafety', 0) < 1:
 
193
            log.msg('DB API module not sufficiently thread-safe.')
 
194
 
 
195
        self.connargs = connargs
 
196
        self.connkw = connkw
 
197
 
 
198
        for arg in self.CP_ARGS:
 
199
            cp_arg = 'cp_%s' % arg
 
200
            if connkw.has_key(cp_arg):
 
201
                setattr(self, arg, connkw[cp_arg])
 
202
                del connkw[cp_arg]
 
203
 
 
204
        self.min = min(self.min, self.max)
 
205
        self.max = max(self.min, self.max)
 
206
 
 
207
        self.connections = {}  # all connections, hashed on thread id
 
208
 
 
209
        # these are optional so import them here
 
210
        from twisted.python import threadpool
 
211
        import thread
 
212
 
 
213
        self.threadID = thread.get_ident
 
214
        self.threadpool = threadpool.ThreadPool(self.min, self.max)
 
215
 
 
216
        from twisted.internet import reactor
 
217
        self.startID = reactor.callWhenRunning(self._start)
 
218
 
 
219
    def _start(self):
 
220
        self.startID = None
 
221
        return self.start()
 
222
 
 
223
    def start(self):
 
224
        """Start the connection pool.
 
225
 
 
226
        If you are using the reactor normally, this function does *not*
 
227
        need to be called.
 
228
        """
 
229
 
 
230
        if not self.running:
 
231
            from twisted.internet import reactor
 
232
            self.threadpool.start()
 
233
            self.shutdownID = reactor.addSystemEventTrigger('during',
 
234
                                                            'shutdown',
 
235
                                                            self.finalClose)
 
236
            self.running = True
 
237
 
 
238
 
 
239
    def runWithConnection(self, func, *args, **kw):
 
240
        """
 
241
        Execute a function with a database connection and return the result.
 
242
 
 
243
        @param func: A callable object of one argument which will be executed
 
244
            in a thread with a connection from the pool.  It will be passed as
 
245
            its first argument a L{Connection} instance (whose interface is
 
246
            mostly identical to that of a connection object for your DB-API
 
247
            module of choice), and its results will be returned as a Deferred.
 
248
            If the method raises an exception the transaction will be rolled
 
249
            back.  Otherwise, the transaction will be committed.  B{Note} that
 
250
            this function is B{not} run in the main thread: it must be
 
251
            threadsafe.
 
252
 
 
253
        @param *args: positional arguments to be passed to func
 
254
 
 
255
        @param **kw: keyword arguments to be passed to func
 
256
 
 
257
        @return: a Deferred which will fire the return value of
 
258
            C{func(Transaction(...), *args, **kw)}, or a Failure.
 
259
        """
 
260
        from twisted.internet import reactor
 
261
        return threads.deferToThreadPool(reactor, self.threadpool,
 
262
                                         self._runWithConnection,
 
263
                                         func, *args, **kw)
 
264
 
 
265
 
 
266
    def _runWithConnection(self, func, *args, **kw):
 
267
        conn = self.connectionFactory(self)
 
268
        try:
 
269
            result = func(conn, *args, **kw)
 
270
            conn.commit()
 
271
            return result
 
272
        except:
 
273
            excType, excValue, excTraceback = sys.exc_info()
 
274
            try:
 
275
                conn.rollback()
 
276
            except:
 
277
                log.err(None, "Rollback failed")
 
278
            raise excType, excValue, excTraceback
 
279
 
 
280
 
 
281
    def runInteraction(self, interaction, *args, **kw):
 
282
        """
 
283
        Interact with the database and return the result.
 
284
 
 
285
        The 'interaction' is a callable object which will be executed
 
286
        in a thread using a pooled connection. It will be passed an
 
287
        L{Transaction} object as an argument (whose interface is
 
288
        identical to that of the database cursor for your DB-API
 
289
        module of choice), and its results will be returned as a
 
290
        Deferred. If running the method raises an exception, the
 
291
        transaction will be rolled back. If the method returns a
 
292
        value, the transaction will be committed.
 
293
 
 
294
        NOTE that the function you pass is *not* run in the main
 
295
        thread: you may have to worry about thread-safety in the
 
296
        function you pass to this if it tries to use non-local
 
297
        objects.
 
298
 
 
299
        @param interaction: a callable object whose first argument
 
300
            is an L{adbapi.Transaction}.
 
301
 
 
302
        @param *args: additional positional arguments to be passed
 
303
            to interaction
 
304
 
 
305
        @param **kw: keyword arguments to be passed to interaction
 
306
 
 
307
        @return: a Deferred which will fire the return value of
 
308
            'interaction(Transaction(...), *args, **kw)', or a Failure.
 
309
        """
 
310
        from twisted.internet import reactor
 
311
        return threads.deferToThreadPool(reactor, self.threadpool,
 
312
                                         self._runInteraction,
 
313
                                         interaction, *args, **kw)
 
314
 
 
315
 
 
316
    def runQuery(self, *args, **kw):
 
317
        """Execute an SQL query and return the result.
 
318
 
 
319
        A DB-API cursor will will be invoked with cursor.execute(*args, **kw).
 
320
        The exact nature of the arguments will depend on the specific flavor
 
321
        of DB-API being used, but the first argument in *args be an SQL
 
322
        statement. The result of a subsequent cursor.fetchall() will be
 
323
        fired to the Deferred which is returned. If either the 'execute' or
 
324
        'fetchall' methods raise an exception, the transaction will be rolled
 
325
        back and a Failure returned.
 
326
 
 
327
        The  *args and **kw arguments will be passed to the DB-API cursor's
 
328
        'execute' method.
 
329
 
 
330
        @return: a Deferred which will fire the return value of a DB-API
 
331
        cursor's 'fetchall' method, or a Failure.
 
332
        """
 
333
        return self.runInteraction(self._runQuery, *args, **kw)
 
334
 
 
335
 
 
336
    def runOperation(self, *args, **kw):
 
337
        """Execute an SQL query and return None.
 
338
 
 
339
        A DB-API cursor will will be invoked with cursor.execute(*args, **kw).
 
340
        The exact nature of the arguments will depend on the specific flavor
 
341
        of DB-API being used, but the first argument in *args will be an SQL
 
342
        statement. This method will not attempt to fetch any results from the
 
343
        query and is thus suitable for INSERT, DELETE, and other SQL statements
 
344
        which do not return values. If the 'execute' method raises an
 
345
        exception, the transaction will be rolled back and a Failure returned.
 
346
 
 
347
        The args and kw arguments will be passed to the DB-API cursor's
 
348
        'execute' method.
 
349
 
 
350
        return: a Deferred which will fire None or a Failure.
 
351
        """
 
352
        return self.runInteraction(self._runOperation, *args, **kw)
 
353
 
 
354
 
 
355
    def close(self):
 
356
        """Close all pool connections and shutdown the pool."""
 
357
 
 
358
        from twisted.internet import reactor
 
359
        if self.shutdownID:
 
360
            reactor.removeSystemEventTrigger(self.shutdownID)
 
361
            self.shutdownID = None
 
362
        if self.startID:
 
363
            reactor.removeSystemEventTrigger(self.startID)
 
364
            self.startID = None
 
365
        self.finalClose()
 
366
 
 
367
    def finalClose(self):
 
368
        """This should only be called by the shutdown trigger."""
 
369
 
 
370
        self.shutdownID = None
 
371
        self.threadpool.stop()
 
372
        self.running = False
 
373
        for conn in self.connections.values():
 
374
            self._close(conn)
 
375
        self.connections.clear()
 
376
 
 
377
    def connect(self):
 
378
        """Return a database connection when one becomes available.
 
379
 
 
380
        This method blocks and should be run in a thread from the internal
 
381
        threadpool. Don't call this method directly from non-threaded code.
 
382
        Using this method outside the external threadpool may exceed the
 
383
        maximum number of connections in the pool.
 
384
 
 
385
        @return: a database connection from the pool.
 
386
        """
 
387
 
 
388
        tid = self.threadID()
 
389
        conn = self.connections.get(tid)
 
390
        if conn is None:
 
391
            if self.noisy:
 
392
                log.msg('adbapi connecting: %s %s%s' % (self.dbapiName,
 
393
                                                        self.connargs or '',
 
394
                                                        self.connkw or ''))
 
395
            conn = self.dbapi.connect(*self.connargs, **self.connkw)
 
396
            if self.openfun != None:
 
397
                self.openfun(conn)
 
398
            self.connections[tid] = conn
 
399
        return conn
 
400
 
 
401
    def disconnect(self, conn):
 
402
        """Disconnect a database connection associated with this pool.
 
403
 
 
404
        Note: This function should only be used by the same thread which
 
405
        called connect(). As with connect(), this function is not used
 
406
        in normal non-threaded twisted code.
 
407
        """
 
408
        tid = self.threadID()
 
409
        if conn is not self.connections.get(tid):
 
410
            raise Exception("wrong connection for thread")
 
411
        if conn is not None:
 
412
            self._close(conn)
 
413
            del self.connections[tid]
 
414
 
 
415
 
 
416
    def _close(self, conn):
 
417
        if self.noisy:
 
418
            log.msg('adbapi closing: %s' % (self.dbapiName,))
 
419
        try:
 
420
            conn.close()
 
421
        except:
 
422
            log.err(None, "Connection close failed")
 
423
 
 
424
 
 
425
    def _runInteraction(self, interaction, *args, **kw):
 
426
        conn = self.connectionFactory(self)
 
427
        trans = self.transactionFactory(self, conn)
 
428
        try:
 
429
            result = interaction(trans, *args, **kw)
 
430
            trans.close()
 
431
            conn.commit()
 
432
            return result
 
433
        except:
 
434
            excType, excValue, excTraceback = sys.exc_info()
 
435
            try:
 
436
                conn.rollback()
 
437
            except:
 
438
                log.err(None, "Rollback failed")
 
439
            raise excType, excValue, excTraceback
 
440
 
 
441
 
 
442
    def _runQuery(self, trans, *args, **kw):
 
443
        trans.execute(*args, **kw)
 
444
        return trans.fetchall()
 
445
 
 
446
    def _runOperation(self, trans, *args, **kw):
 
447
        trans.execute(*args, **kw)
 
448
 
 
449
    def __getstate__(self):
 
450
        return {'dbapiName': self.dbapiName,
 
451
                'min': self.min,
 
452
                'max': self.max,
 
453
                'noisy': self.noisy,
 
454
                'reconnect': self.reconnect,
 
455
                'good_sql': self.good_sql,
 
456
                'connargs': self.connargs,
 
457
                'connkw': self.connkw}
 
458
 
 
459
    def __setstate__(self, state):
 
460
        self.__dict__ = state
 
461
        self.__init__(self.dbapiName, *self.connargs, **self.connkw)
 
462
 
 
463
 
 
464
 
 
465
# Common deprecation decorator used for all deprecations.
 
466
_unreleasedVersion = Version("Twisted", 8, 0, 0)
 
467
_unreleasedDeprecation = deprecated(_unreleasedVersion)
 
468
 
 
469
 
 
470
 
 
471
def _safe(text):
 
472
    """
 
473
    Something really stupid that replaces quotes with escaped quotes.
 
474
    """
 
475
    return text.replace("'", "''").replace("\\", "\\\\")
 
476
 
 
477
 
 
478
 
 
479
def safe(text):
 
480
    """
 
481
    Make a string safe to include in an SQL statement.
 
482
    """
 
483
    return _safe(text)
 
484
 
 
485
safe = _unreleasedDeprecation(safe)
 
486
 
 
487
 
 
488
__all__ = ['Transaction', 'ConnectionPool', 'safe']