1
1
# postgresql/psycopg2.py
2
# Copyright (C) 2005-2012 the SQLAlchemy authors and contributors <see AUTHORS file>
2
# Copyright (C) 2005-2013 the SQLAlchemy authors and contributors <see AUTHORS file>
4
4
# This module is part of SQLAlchemy and is released under
5
5
# the MIT License: http://www.opensource.org/licenses/mit-license.php
7
"""Support for the PostgreSQL database via the psycopg2 driver.
12
The psycopg2 driver is available at http://pypi.python.org/pypi/psycopg2/ .
13
The dialect has several behaviors which are specifically tailored towards compatibility
16
Note that psycopg1 is **not** supported.
22
``postgresql+psycopg2://user:password@host:port/dbname[?key=value&key=value...]``.
8
.. dialect:: postgresql+psycopg2
11
:connectstring: postgresql+psycopg2://user:password@host:port/dbname[?key=value&key=value...]
12
:url: http://pypi.python.org/pypi/psycopg2/
14
psycopg2 Connect Arguments
15
-----------------------------------
24
17
psycopg2-specific keyword arguments which are accepted by
25
18
:func:`.create_engine()` are:
27
* *server_side_cursors* - Enable the usage of "server side cursors" for SQL
20
* ``server_side_cursors``: Enable the usage of "server side cursors" for SQL
28
21
statements which support this feature. What this essentially means from a
29
22
psycopg2 point of view is that the cursor is created using a name, e.g.
30
23
``connection.cursor('some name')``, which has the effect that result rows are
31
24
not immediately pre-fetched and buffered after statement execution, but are
32
25
instead left on the server and only retrieved as needed. SQLAlchemy's
33
:class:`~sqlalchemy.engine.base.ResultProxy` uses special row-buffering
26
:class:`~sqlalchemy.engine.ResultProxy` uses special row-buffering
34
27
behavior when this feature is enabled, such that groups of 100 rows at a
35
28
time are fetched over the wire to reduce conversational overhead.
36
29
Note that the ``stream_results=True`` execution option is a more targeted
37
30
way of enabling this mode on a per-execution basis.
38
* *use_native_unicode* - Enable the usage of Psycopg2 "native unicode" mode
39
per connection. True by default.
31
* ``use_native_unicode``: Enable the usage of Psycopg2 "native unicode" mode
32
per connection. True by default.
33
* ``isolation_level``: This option, available for all Posgtresql dialects,
34
includes the ``AUTOCOMMIT`` isolation level when using the psycopg2
35
dialect. See :ref:`psycopg2_isolation_level`.
41
38
Unix Domain Connections
42
39
------------------------
66
63
:meth:`.Query.execution_options`, in addition to those not specific to DBAPIs:
68
65
* isolation_level - Set the transaction isolation level for the lifespan of a
69
:class:`.Connection` (can only be set on a connection, not a statement or query).
70
This includes the options ``SERIALIZABLE``, ``READ COMMITTED``,
71
``READ UNCOMMITTED`` and ``REPEATABLE READ``.
72
* stream_results - Enable or disable usage of server side cursors.
73
If ``None`` or not set, the ``server_side_cursors`` option of the :class:`.Engine` is used.
66
:class:`.Connection` (can only be set on a connection, not a statement
67
or query). See :ref:`psycopg2_isolation_level`.
69
* stream_results - Enable or disable usage of psycopg2 server side cursors -
70
this feature makes use of "named" cursors in combination with special
71
result handling methods so that result rows are not fully buffered.
72
If ``None`` or not set, the ``server_side_cursors`` option of the
73
:class:`.Engine` is used.
98
98
This overrides the encoding specified in the Postgresql client configuration.
100
100
.. versionadded:: 0.7.3
101
The psycopg2-specific ``client_encoding`` parameter to :func:`.create_engine`.
101
The psycopg2-specific ``client_encoding`` parameter to
102
:func:`.create_engine`.
103
104
SQLAlchemy can also be instructed to skip the usage of the psycopg2
104
105
``UNICODE`` extension and to instead utilize it's own unicode encode/decode
105
106
services, which are normally reserved only for those DBAPIs that don't
106
fully support unicode directly. Passing ``use_native_unicode=False``
107
to :func:`.create_engine` will disable usage of ``psycopg2.extensions.UNICODE``.
107
fully support unicode directly. Passing ``use_native_unicode=False`` to
108
:func:`.create_engine` will disable usage of ``psycopg2.extensions.UNICODE``.
108
109
SQLAlchemy will instead encode data itself into Python bytestrings on the way
109
110
in and coerce from bytes on the way back,
110
111
using the value of the :func:`.create_engine` ``encoding`` parameter, which
119
120
The psycopg2 dialect fully supports SAVEPOINT and two-phase commit operations.
121
.. _psycopg2_isolation:
123
Transaction Isolation Level
124
---------------------------
126
The ``isolation_level`` parameter of :func:`.create_engine` here makes use
122
.. _psycopg2_isolation_level:
124
Psycopg2 Transaction Isolation Level
125
-------------------------------------
127
As discussed in :ref:`postgresql_isolation_level`,
128
all Postgresql dialects support setting of transaction isolation level
129
both via the ``isolation_level`` parameter passed to :func:`.create_engine`,
130
as well as the ``isolation_level`` argument used by :meth:`.Connection.execution_options`.
131
When using the psycopg2 dialect, these options make use of
127
132
psycopg2's ``set_isolation_level()`` connection method, rather than
128
issuing a ``SET SESSION CHARACTERISTICS`` command. This because psycopg2
129
resets the isolation level on each new transaction, and needs to know
130
at the API level what level should be used.
133
emitting a Postgresql directive; this is because psycopg2's API-level
134
setting is always emitted at the start of each transaction in any case.
136
The psycopg2 dialect supports these constants for isolation level:
139
* ``READ UNCOMMITTED``
140
* ``REPEATABLE READ``
144
.. versionadded:: 0.8.2 support for AUTOCOMMIT isolation level when using
139
155
logging.getLogger('sqlalchemy.dialects.postgresql').setLevel(logging.INFO)
160
The psycopg2 dialect will make use of the
161
``psycopg2.extensions.register_hstore()`` extension when using the HSTORE
162
type. This replaces SQLAlchemy's pure-Python HSTORE coercion which takes
163
effect for other DBAPIs.
166
from __future__ import absolute_import
147
from sqlalchemy import util, exc
148
from sqlalchemy.util.compat import decimal
149
from sqlalchemy import processors
150
from sqlalchemy.engine import base
151
from sqlalchemy.sql import expression
152
from sqlalchemy import types as sqltypes
153
from sqlalchemy.dialects.postgresql.base import PGDialect, PGCompiler, \
170
from ... import util, exc
172
from ... import processors
173
from ...engine import result as _result
174
from ...sql import expression
175
from ... import types as sqltypes
176
from .base import PGDialect, PGCompiler, \
154
177
PGIdentifierPreparer, PGExecutionContext, \
155
178
ENUM, ARRAY, _DECIMAL_TYPES, _FLOAT_TYPES,\
180
from .hstore import HSTORE
159
183
logger = logging.getLogger('sqlalchemy.dialects.postgresql')
183
207
raise exc.InvalidRequestError(
184
208
"Unknown PG numeric type: %d" % coltype)
186
211
class _PGEnum(ENUM):
187
212
def __init__(self, *arg, **kw):
188
213
super(_PGEnum, self).__init__(*arg, **kw)
202
228
self.item_type.convert_unicode = "force"
232
class _PGHStore(HSTORE):
233
def bind_processor(self, dialect):
234
if dialect._has_native_hstore:
237
return super(_PGHStore, self).bind_processor(dialect)
239
def result_processor(self, dialect, coltype):
240
if dialect._has_native_hstore:
243
return super(_PGHStore, self).result_processor(dialect, coltype)
205
245
# When we're handed literal SQL, ensure it's a SELECT-query. Since
206
246
# 8.3, combining cursors and "FOR UPDATE" has been fine.
207
247
SERVER_SIDE_CURSOR_RE = re.compile(
223
264
(not self.compiled or
224
isinstance(self.compiled.statement, expression._TextClause))
265
isinstance(self.compiled.statement, expression.TextClause))
225
266
and self.statement and SERVER_SIDE_CURSOR_RE.match(self.statement))
229
is_server_side = self.execution_options.get('stream_results', False)
271
self.execution_options.get('stream_results', False)
231
273
self.__is_server_side = is_server_side
232
274
if is_server_side:
243
285
self._log_notices(self.cursor)
245
287
if self.__is_server_side:
246
return base.BufferedRowResultProxy(self)
288
return _result.BufferedRowResultProxy(self)
248
return base.ResultProxy(self)
290
return _result.ResultProxy(self)
250
292
def _log_notices(self, cursor):
251
293
for notice in cursor.connection.notices:
259
301
class PGCompiler_psycopg2(PGCompiler):
260
def visit_mod(self, binary, **kw):
261
return self.process(binary.left) + " %% " + self.process(binary.right)
302
def visit_mod_binary(self, binary, operator, **kw):
303
return self.process(binary.left, **kw) + " %% " + \
304
self.process(binary.right, **kw)
263
306
def post_process_text(self, text):
264
307
return text.replace('%', '%%')
281
325
preparer = PGIdentifierPreparer_psycopg2
282
326
psycopg2_version = (0, 0)
328
_has_native_hstore = False
284
330
colspecs = util.update_copy(
285
331
PGDialect.colspecs,
287
sqltypes.Numeric : _PGNumeric,
288
ENUM : _PGEnum, # needs force_unicode
289
sqltypes.Enum : _PGEnum, # needs force_unicode
290
ARRAY : _PGArray, # needs force_unicode
333
sqltypes.Numeric: _PGNumeric,
334
ENUM: _PGEnum, # needs force_unicode
335
sqltypes.Enum: _PGEnum, # needs force_unicode
336
ARRAY: _PGArray, # needs force_unicode
294
341
def __init__(self, server_side_cursors=False, use_native_unicode=True,
295
client_encoding=None, **kwargs):
342
client_encoding=None,
343
use_native_hstore=True,
296
345
PGDialect.__init__(self, **kwargs)
297
346
self.server_side_cursors = server_side_cursors
298
347
self.use_native_unicode = use_native_unicode
348
self.use_native_hstore = use_native_hstore
299
349
self.supports_unicode_binds = use_native_unicode
300
350
self.client_encoding = client_encoding
301
351
if self.dbapi and hasattr(self.dbapi, '__version__'):
307
357
for x in m.group(1, 2, 3)
308
358
if x is not None)
360
def initialize(self, connection):
361
super(PGDialect_psycopg2, self).initialize(connection)
362
self._has_native_hstore = self.use_native_hstore and \
363
self._hstore_oids(connection.connection) \
312
psycopg = __import__('psycopg2')
315
371
@util.memoized_property
316
372
def _isolation_lookup(self):
317
373
extensions = __import__('psycopg2.extensions').extensions
319
'READ COMMITTED':extensions.ISOLATION_LEVEL_READ_COMMITTED,
320
'READ UNCOMMITTED':extensions.ISOLATION_LEVEL_READ_UNCOMMITTED,
321
'REPEATABLE READ':extensions.ISOLATION_LEVEL_REPEATABLE_READ,
322
'SERIALIZABLE':extensions.ISOLATION_LEVEL_SERIALIZABLE
375
'AUTOCOMMIT': extensions.ISOLATION_LEVEL_AUTOCOMMIT,
376
'READ COMMITTED': extensions.ISOLATION_LEVEL_READ_COMMITTED,
377
'READ UNCOMMITTED': extensions.ISOLATION_LEVEL_READ_UNCOMMITTED,
378
'REPEATABLE READ': extensions.ISOLATION_LEVEL_REPEATABLE_READ,
379
'SERIALIZABLE': extensions.ISOLATION_LEVEL_SERIALIZABLE
325
382
def set_isolation_level(self, connection, level):
347
406
fns.append(on_connect)
349
408
if self.dbapi and self.use_native_unicode:
350
extensions = __import__('psycopg2.extensions').extensions
351
409
def on_connect(conn):
352
410
extensions.register_type(extensions.UNICODE, conn)
353
411
fns.append(on_connect)
413
if self.dbapi and self.use_native_hstore:
414
def on_connect(conn):
415
hstore_oids = self._hstore_oids(conn)
416
if hstore_oids is not None:
417
oid, array_oid = hstore_oids
419
extras.register_hstore(conn, oid=oid,
423
extras.register_hstore(conn, oid=oid,
425
fns.append(on_connect)
356
428
def on_connect(conn):
435
@util.memoized_instancemethod
436
def _hstore_oids(self, conn):
437
if self.psycopg2_version >= (2, 4):
438
from psycopg2 import extras
439
oids = extras.HstoreAdapter.get_oids(conn)
440
if oids is not None and oids[0]:
363
444
def create_connect_args(self, url):
364
445
opts = url.translate_connect_args(username='user')
365
446
if 'port' in opts:
368
449
return ([], opts)
370
451
def is_disconnect(self, e, connection, cursor):
371
if isinstance(e, self.dbapi.OperationalError):
372
# these error messages from libpq: interfaces/libpq/fe-misc.c.
373
# TODO: these are sent through gettext in libpq and we can't
374
# check within other locales - consider using connection.closed
375
return 'terminating connection' in str(e) or \
376
'closed the connection' in str(e) or \
377
'connection not open' in str(e) or \
378
'could not receive data from server' in str(e)
379
elif isinstance(e, self.dbapi.InterfaceError):
380
# psycopg2 client errors, psycopg2/conenction.h, psycopg2/cursor.h
381
return 'connection already closed' in str(e) or \
382
'cursor already closed' in str(e)
383
elif isinstance(e, self.dbapi.ProgrammingError):
384
# not sure where this path is originally from, it may
385
# be obsolete. It really says "losed", not "closed".
386
return "losed the connection unexpectedly" in str(e)
452
if isinstance(e, self.dbapi.Error):
453
str_e = str(e).partition("\n")[0]
455
# these error messages from libpq: interfaces/libpq/fe-misc.c
456
# and interfaces/libpq/fe-secure.c.
457
# TODO: these are sent through gettext in libpq and we can't
458
# check within other locales - consider using connection.closed
459
'terminating connection',
460
'closed the connection',
461
'connection not open',
462
'could not receive data from server',
463
# psycopg2 client errors, psycopg2/conenction.h, psycopg2/cursor.h
464
'connection already closed',
465
'cursor already closed',
466
# not sure where this path is originally from, it may
467
# be obsolete. It really says "losed", not "closed".
468
'losed the connection unexpectedly'
470
idx = str_e.find(msg)
471
if idx >= 0 and '"' not in str_e[:idx]:
390
475
dialect = PGDialect_psycopg2