1
# -*- test-case-name: twisted.test.test_adbapi -*-
2
# Copyright (c) 2001-2008 Twisted Matrix Laboratories.
3
# See LICENSE for details.
6
An asynchronous mapping to U{DB-API 2.0<http://www.python.org/topics/database/DatabaseAPI-2.0.html>}.
11
from twisted.internet import threads
12
from twisted.python import reflect, log
13
from twisted.python.deprecate import deprecated
14
from twisted.python.versions import Version
18
class ConnectionLost(Exception):
20
This exception means that a db connection has been lost. Client code may
26
class Connection(object):
28
A wrapper for a DB-API connection instance.
30
The wrapper passes almost everything to the wrapped connection and so has
31
the same API. However, the Connection knows about its pool and also
32
handle reconnecting should when the real connection dies.
35
def __init__(self, pool):
37
self._connection = None
41
# The way adbapi works right now means that closing a connection is
42
# a really bad thing as it leaves a dead connection associated with
43
# a thread in the thread pool.
44
# Really, I think closing a pooled connection should return it to the
45
# pool but that's handled by the runWithConnection method already so,
46
# rather than upsetting anyone by raising an exception, let's ignore
51
if not self._pool.reconnect:
52
self._connection.rollback()
56
self._connection.rollback()
57
curs = self._connection.cursor()
58
curs.execute(self._pool.good_sql)
60
self._connection.commit()
63
log.err(None, "Rollback failed")
65
self._pool.disconnect(self._connection)
68
log.msg("Connection lost.")
70
raise ConnectionLost()
73
if self._connection is not None:
74
self._pool.disconnect(self._connection)
75
self._connection = self._pool.connect()
77
def __getattr__(self, name):
78
return getattr(self._connection, name)
82
"""A lightweight wrapper for a DB-API 'cursor' object.
84
Relays attribute access to the DB cursor. That is, you can call
85
execute(), fetchall(), etc., and they will be called on the
86
underlying DB-API cursor object. Attributes will also be
91
def __init__(self, pool, connection):
93
self._connection = connection
97
_cursor = self._cursor
102
if self._cursor is not None:
106
self._cursor = self._connection.cursor()
109
if not self._pool.reconnect:
112
log.err(None, "Cursor creation failed")
115
log.msg('Connection lost, reconnecting')
118
self._cursor = self._connection.cursor()
121
self._connection.reconnect()
124
def __getattr__(self, name):
125
return getattr(self._cursor, name)
128
class ConnectionPool:
130
Represent a pool of connections to a DB-API 2.0 compliant database.
132
@ivar connectionFactory: factory for connections, default to L{Connection}.
133
@type connectionFactory: any callable.
135
@ivar transactionFactory: factory for transactions, default to
137
@type transactionFactory: any callable
140
CP_ARGS = "min max name noisy openfun reconnect good_sql".split()
142
noisy = False # if true, generate informational log messages
143
min = 3 # minimum number of connections in pool
144
max = 5 # maximum number of connections in pool
145
name = None # Name to assign to thread pool for debugging
146
openfun = None # A function to call on new connections
147
reconnect = False # reconnect when connections fail
148
good_sql = 'select 1' # a query which should always succeed
150
running = False # true when the pool is operating
151
connectionFactory = Connection
152
transactionFactory = Transaction
154
def __init__(self, dbapiName, *connargs, **connkw):
155
"""Create a new ConnectionPool.
157
Any positional or keyword arguments other than those documented here
158
are passed to the DB-API object when connecting. Use these arguments to
159
pass database names, usernames, passwords, etc.
161
@param dbapiName: an import string to use to obtain a DB-API compatible
162
module (e.g. 'pyPgSQL.PgSQL')
164
@param cp_min: the minimum number of connections in pool (default 3)
166
@param cp_max: the maximum number of connections in pool (default 5)
168
@param cp_noisy: generate informational log messages during operation
171
@param cp_openfun: a callback invoked after every connect() on the
172
underlying DB-API object. The callback is passed a
173
new DB-API connection object. This callback can
174
setup per-connection state such as charset,
177
@param cp_reconnect: detect connections which have failed and reconnect
178
(default False). Failed connections may result in
179
ConnectionLost exceptions, which indicate the
180
query may need to be re-sent.
182
@param cp_good_sql: an sql query which should always succeed and change
183
no state (default 'select 1')
186
self.dbapiName = dbapiName
187
self.dbapi = reflect.namedModule(dbapiName)
189
if getattr(self.dbapi, 'apilevel', None) != '2.0':
190
log.msg('DB API module not DB API 2.0 compliant.')
192
if getattr(self.dbapi, 'threadsafety', 0) < 1:
193
log.msg('DB API module not sufficiently thread-safe.')
195
self.connargs = connargs
198
for arg in self.CP_ARGS:
199
cp_arg = 'cp_%s' % arg
200
if connkw.has_key(cp_arg):
201
setattr(self, arg, connkw[cp_arg])
204
self.min = min(self.min, self.max)
205
self.max = max(self.min, self.max)
207
self.connections = {} # all connections, hashed on thread id
209
# these are optional so import them here
210
from twisted.python import threadpool
213
self.threadID = thread.get_ident
214
self.threadpool = threadpool.ThreadPool(self.min, self.max)
216
from twisted.internet import reactor
217
self.startID = reactor.callWhenRunning(self._start)
224
"""Start the connection pool.
226
If you are using the reactor normally, this function does *not*
231
from twisted.internet import reactor
232
self.threadpool.start()
233
self.shutdownID = reactor.addSystemEventTrigger('during',
239
def runWithConnection(self, func, *args, **kw):
241
Execute a function with a database connection and return the result.
243
@param func: A callable object of one argument which will be executed
244
in a thread with a connection from the pool. It will be passed as
245
its first argument a L{Connection} instance (whose interface is
246
mostly identical to that of a connection object for your DB-API
247
module of choice), and its results will be returned as a Deferred.
248
If the method raises an exception the transaction will be rolled
249
back. Otherwise, the transaction will be committed. B{Note} that
250
this function is B{not} run in the main thread: it must be
253
@param *args: positional arguments to be passed to func
255
@param **kw: keyword arguments to be passed to func
257
@return: a Deferred which will fire the return value of
258
C{func(Transaction(...), *args, **kw)}, or a Failure.
260
from twisted.internet import reactor
261
return threads.deferToThreadPool(reactor, self.threadpool,
262
self._runWithConnection,
266
def _runWithConnection(self, func, *args, **kw):
267
conn = self.connectionFactory(self)
269
result = func(conn, *args, **kw)
273
excType, excValue, excTraceback = sys.exc_info()
277
log.err(None, "Rollback failed")
278
raise excType, excValue, excTraceback
281
def runInteraction(self, interaction, *args, **kw):
283
Interact with the database and return the result.
285
The 'interaction' is a callable object which will be executed
286
in a thread using a pooled connection. It will be passed an
287
L{Transaction} object as an argument (whose interface is
288
identical to that of the database cursor for your DB-API
289
module of choice), and its results will be returned as a
290
Deferred. If running the method raises an exception, the
291
transaction will be rolled back. If the method returns a
292
value, the transaction will be committed.
294
NOTE that the function you pass is *not* run in the main
295
thread: you may have to worry about thread-safety in the
296
function you pass to this if it tries to use non-local
299
@param interaction: a callable object whose first argument
300
is an L{adbapi.Transaction}.
302
@param *args: additional positional arguments to be passed
305
@param **kw: keyword arguments to be passed to interaction
307
@return: a Deferred which will fire the return value of
308
'interaction(Transaction(...), *args, **kw)', or a Failure.
310
from twisted.internet import reactor
311
return threads.deferToThreadPool(reactor, self.threadpool,
312
self._runInteraction,
313
interaction, *args, **kw)
316
def runQuery(self, *args, **kw):
317
"""Execute an SQL query and return the result.
319
A DB-API cursor will will be invoked with cursor.execute(*args, **kw).
320
The exact nature of the arguments will depend on the specific flavor
321
of DB-API being used, but the first argument in *args be an SQL
322
statement. The result of a subsequent cursor.fetchall() will be
323
fired to the Deferred which is returned. If either the 'execute' or
324
'fetchall' methods raise an exception, the transaction will be rolled
325
back and a Failure returned.
327
The *args and **kw arguments will be passed to the DB-API cursor's
330
@return: a Deferred which will fire the return value of a DB-API
331
cursor's 'fetchall' method, or a Failure.
333
return self.runInteraction(self._runQuery, *args, **kw)
336
def runOperation(self, *args, **kw):
337
"""Execute an SQL query and return None.
339
A DB-API cursor will will be invoked with cursor.execute(*args, **kw).
340
The exact nature of the arguments will depend on the specific flavor
341
of DB-API being used, but the first argument in *args will be an SQL
342
statement. This method will not attempt to fetch any results from the
343
query and is thus suitable for INSERT, DELETE, and other SQL statements
344
which do not return values. If the 'execute' method raises an
345
exception, the transaction will be rolled back and a Failure returned.
347
The args and kw arguments will be passed to the DB-API cursor's
350
return: a Deferred which will fire None or a Failure.
352
return self.runInteraction(self._runOperation, *args, **kw)
356
"""Close all pool connections and shutdown the pool."""
358
from twisted.internet import reactor
360
reactor.removeSystemEventTrigger(self.shutdownID)
361
self.shutdownID = None
363
reactor.removeSystemEventTrigger(self.startID)
367
def finalClose(self):
368
"""This should only be called by the shutdown trigger."""
370
self.shutdownID = None
371
self.threadpool.stop()
373
for conn in self.connections.values():
375
self.connections.clear()
378
"""Return a database connection when one becomes available.
380
This method blocks and should be run in a thread from the internal
381
threadpool. Don't call this method directly from non-threaded code.
382
Using this method outside the external threadpool may exceed the
383
maximum number of connections in the pool.
385
@return: a database connection from the pool.
388
tid = self.threadID()
389
conn = self.connections.get(tid)
392
log.msg('adbapi connecting: %s %s%s' % (self.dbapiName,
395
conn = self.dbapi.connect(*self.connargs, **self.connkw)
396
if self.openfun != None:
398
self.connections[tid] = conn
401
def disconnect(self, conn):
402
"""Disconnect a database connection associated with this pool.
404
Note: This function should only be used by the same thread which
405
called connect(). As with connect(), this function is not used
406
in normal non-threaded twisted code.
408
tid = self.threadID()
409
if conn is not self.connections.get(tid):
410
raise Exception("wrong connection for thread")
413
del self.connections[tid]
416
def _close(self, conn):
418
log.msg('adbapi closing: %s' % (self.dbapiName,))
422
log.err(None, "Connection close failed")
425
def _runInteraction(self, interaction, *args, **kw):
426
conn = self.connectionFactory(self)
427
trans = self.transactionFactory(self, conn)
429
result = interaction(trans, *args, **kw)
434
excType, excValue, excTraceback = sys.exc_info()
438
log.err(None, "Rollback failed")
439
raise excType, excValue, excTraceback
442
def _runQuery(self, trans, *args, **kw):
443
trans.execute(*args, **kw)
444
return trans.fetchall()
446
def _runOperation(self, trans, *args, **kw):
447
trans.execute(*args, **kw)
449
def __getstate__(self):
450
return {'dbapiName': self.dbapiName,
454
'reconnect': self.reconnect,
455
'good_sql': self.good_sql,
456
'connargs': self.connargs,
457
'connkw': self.connkw}
459
def __setstate__(self, state):
460
self.__dict__ = state
461
self.__init__(self.dbapiName, *self.connargs, **self.connkw)
465
# Common deprecation decorator used for all deprecations.
466
_unreleasedVersion = Version("Twisted", 8, 0, 0)
467
_unreleasedDeprecation = deprecated(_unreleasedVersion)
473
Something really stupid that replaces quotes with escaped quotes.
475
return text.replace("'", "''").replace("\\", "\\\\")
481
Make a string safe to include in an SQL statement.
485
safe = _unreleasedDeprecation(safe)
488
__all__ = ['Transaction', 'ConnectionPool', 'safe']