2
# Copyright (C) 2005-2012 the SQLAlchemy authors and contributors <see AUTHORS file>
2
# Copyright (C) 2005-2013 the SQLAlchemy authors and contributors <see AUTHORS file>
4
4
# This module is part of SQLAlchemy and is released under
5
5
# the MIT License: http://www.opensource.org/licenses/mit-license.php
7
"""Support for the Oracle database.
9
Oracle version 8 through current (11g at the time of this writing) are supported.
11
For information on connecting via specific drivers, see the documentation
11
Oracle version 8 through current (11g at the time of this writing) are supported.
145
from sqlalchemy import schema as sa_schema
146
from sqlalchemy import util, sql, log
144
from sqlalchemy import util, sql
147
145
from sqlalchemy.engine import default, base, reflection
148
146
from sqlalchemy.sql import compiler, visitors, expression
149
147
from sqlalchemy.sql import operators as sql_operators, functions as sql_functions
203
205
class DOUBLE_PRECISION(sqltypes.Numeric):
204
206
__visit_name__ = 'DOUBLE_PRECISION'
205
208
def __init__(self, precision=None, scale=None, asdecimal=None):
206
209
if asdecimal is None:
207
210
asdecimal = False
209
212
super(DOUBLE_PRECISION, self).__init__(precision=precision, scale=scale, asdecimal=asdecimal)
211
215
class BFILE(sqltypes.LargeBinary):
212
216
__visit_name__ = 'BFILE'
214
219
class LONG(sqltypes.Text):
215
220
__visit_name__ = 'LONG'
217
223
class INTERVAL(sqltypes.TypeEngine):
218
224
__visit_name__ = 'INTERVAL'
253
260
__visit_name__ = 'ROWID'
257
263
class _OracleBoolean(sqltypes.Boolean):
258
264
def get_dbapi_type(self, dbapi):
259
265
return dbapi.NUMBER
262
sqltypes.Boolean : _OracleBoolean,
263
sqltypes.Interval : INTERVAL,
268
sqltypes.Boolean: _OracleBoolean,
269
sqltypes.Interval: INTERVAL,
266
272
ischema_names = {
267
'VARCHAR2' : VARCHAR,
268
'NVARCHAR2' : NVARCHAR,
276
'TIMESTAMP' : TIMESTAMP,
277
'TIMESTAMP WITH TIME ZONE' : TIMESTAMP,
278
'INTERVAL DAY TO SECOND' : INTERVAL,
281
'DOUBLE PRECISION' : DOUBLE_PRECISION,
274
'NVARCHAR2': NVARCHAR,
282
'TIMESTAMP': TIMESTAMP,
283
'TIMESTAMP WITH TIME ZONE': TIMESTAMP,
284
'INTERVAL DAY TO SECOND': INTERVAL,
287
'DOUBLE PRECISION': DOUBLE_PRECISION,
336
342
if precision is None:
338
344
elif scale is None:
339
return "%(name)s(%(precision)s)" % {'name':name,'precision': precision}
345
n = "%(name)s(%(precision)s)"
346
return n % {'name': name, 'precision': precision}
341
return "%(name)s(%(precision)s, %(scale)s)" % {'name':name,'precision': precision, 'scale' : scale}
348
n = "%(name)s(%(precision)s, %(scale)s)"
349
return n % {'name': name, 'precision': precision, 'scale': scale}
343
351
def visit_string(self, type_):
344
352
return self.visit_VARCHAR2(type_)
356
364
def _visit_varchar(self, type_, n, num):
357
365
if not n and self.dialect._supports_char_length:
358
return "VARCHAR%(two)s(%(length)s CHAR)" % {
359
'length' : type_.length,
366
varchar = "VARCHAR%(two)s(%(length)s CHAR)"
367
return varchar % {'length': type_.length, 'two': num}
362
return "%(n)sVARCHAR%(two)s(%(length)s)" % {'length' : type_.length,
369
varchar = "%(n)sVARCHAR%(two)s(%(length)s)"
370
return varchar % {'length': type_.length, 'two': num, 'n': n}
365
372
def visit_text(self, type_):
366
373
return self.visit_CLOB(type_)
407
415
self._quoted_bind_names = {}
408
416
super(OracleCompiler, self).__init__(*args, **kwargs)
410
def visit_mod(self, binary, **kw):
411
return "mod(%s, %s)" % (self.process(binary.left), self.process(binary.right))
418
def visit_mod_binary(self, binary, operator, **kw):
419
return "mod(%s, %s)" % (self.process(binary.left, **kw),
420
self.process(binary.right, **kw))
413
422
def visit_now_func(self, fn, **kw):
414
423
return "CURRENT_TIMESTAMP"
416
425
def visit_char_length_func(self, fn, **kw):
417
426
return "LENGTH" + self.function_argspec(fn, **kw)
419
def visit_match_op(self, binary, **kw):
420
return "CONTAINS (%s, %s)" % (self.process(binary.left), self.process(binary.right))
428
def visit_match_op_binary(self, binary, operator, **kw):
429
return "CONTAINS (%s, %s)" % (self.process(binary.left),
430
self.process(binary.right))
432
def visit_true(self, expr, **kw):
435
def visit_false(self, expr, **kw):
422
438
def get_select_hint_text(self, byfroms):
500
516
def returning_clause(self, stmt, returning_cols):
502
def create_out_param(col, i):
503
bindparam = sql.outparam("ret_%d" % i, type_=col.type)
504
self.binds[bindparam.key] = bindparam
505
return self.bindparam_string(self._truncate_bindparam(bindparam))
507
columnlist = list(expression._select_iterables(returning_cols))
509
# within_columns_clause =False so that labels (foo AS bar) don't render
510
columns = [self.process(c, within_columns_clause=False, result_map=self.result_map) for c in columnlist]
512
binds = [create_out_param(c, i) for i, c in enumerate(columnlist)]
514
return 'RETURNING ' + ', '.join(columns) + " INTO " + ", ".join(binds)
520
for i, column in enumerate(expression._select_iterables(returning_cols)):
521
if column.type._has_column_expression:
522
col_expr = column.type.column_expression(column)
525
outparam = sql.outparam("ret_%d" % i, type_=column.type)
526
self.binds[outparam.key] = outparam
527
binds.append(self.bindparam_string(self._truncate_bindparam(outparam)))
528
columns.append(self.process(col_expr, within_columns_clause=False))
529
self.result_map[outparam.key] = (
531
(column, getattr(column, 'name', None),
532
getattr(column, 'key', None)),
536
return 'RETURNING ' + ', '.join(columns) + " INTO " + ", ".join(binds)
516
538
def _TODO_visit_compound_select(self, select):
517
539
"""Need to determine how to get ``LIMIT``/``OFFSET`` into a ``UNION`` for Oracle."""
525
547
if not getattr(select, '_oracle_visit', None):
526
548
if not self.dialect.use_ansi:
527
if self.stack and 'from' in self.stack[-1]:
528
existingfroms = self.stack[-1]['from']
532
froms = select._get_display_froms(existingfroms)
549
froms = self._display_froms_for_select(
550
select, kwargs.get('asfrom', False))
533
551
whereclause = self._get_nonansi_join_whereclause(froms)
534
552
if whereclause is not None:
535
553
select = select.where(whereclause)
809
834
if resolve_synonyms:
810
835
actual_name, owner, dblink, synonym = self._resolve_synonym(
812
desired_owner=self.denormalize_name(schema),
813
desired_synonym=self.denormalize_name(table_name)
837
desired_owner=self.denormalize_name(schema),
838
desired_synonym=self.denormalize_name(table_name)
816
841
actual_name, owner, dblink, synonym = None, None, None, None
817
842
if not actual_name:
818
843
actual_name = self.denormalize_name(table_name)
846
# using user_db_links here since all_db_links appears
847
# to have more restricted permissions.
848
# http://docs.oracle.com/cd/B28359_01/server.111/b28310/ds_admin005.htm
849
# will need to hear from more users if we are doing
850
# the right thing here. See [ticket:2619]
851
owner = connection.scalar(
852
sql.text("SELECT username FROM user_db_links "
853
"WHERE db_link=:link"), link=dblink)
854
dblink = "@" + dblink
822
856
owner = self.denormalize_name(schema or self.default_schema_name)
823
return (actual_name, owner, dblink, synonym)
858
return (actual_name, owner, dblink or '', synonym)
825
860
@reflection.cache
826
861
def get_schema_names(self, connection, **kw):
878
912
char_length_col = 'data_length'
880
c = connection.execute(sql.text(
881
"SELECT column_name, data_type, %(char_length_col)s, data_precision, data_scale, "
882
"nullable, data_default FROM ALL_TAB_COLUMNS%(dblink)s "
883
"WHERE table_name = :table_name AND owner = :owner "
884
"ORDER BY column_id" % {'dblink': dblink, 'char_length_col':char_length_col}),
885
table_name=table_name, owner=schema)
914
params = {"table_name": table_name}
915
text = "SELECT column_name, data_type, %(char_length_col)s, "\
916
"data_precision, data_scale, "\
917
"nullable, data_default FROM ALL_TAB_COLUMNS%(dblink)s "\
918
"WHERE table_name = :table_name"
919
if schema is not None:
920
params['owner'] = schema
921
text += " AND owner = :owner "
922
text += " ORDER BY column_id"
923
text = text % {'dblink': dblink, 'char_length_col': char_length_col}
925
c = connection.execute(sql.text(text), **params)
888
928
(colname, orig_colname, coltype, length, precision, scale, nullable, default) = \
889
(self.normalize_name(row[0]), row[0], row[1], row[2], row[3], row[4], row[5]=='Y', row[6])
929
(self.normalize_name(row[0]), row[0], row[1], row[2], row[3], row[4], row[5] == 'Y', row[6])
891
if coltype == 'NUMBER' :
931
if coltype == 'NUMBER':
892
932
coltype = NUMBER(precision, scale)
893
933
elif coltype in ('VARCHAR2', 'NVARCHAR2', 'CHAR'):
894
934
coltype = self.ischema_names.get(coltype)(length)
920
960
def get_indexes(self, connection, table_name, schema=None,
921
961
resolve_synonyms=False, dblink='', **kw):
924
963
info_cache = kw.get('info_cache')
925
964
(table_name, schema, dblink, synonym) = \
926
965
self._prepare_reflection_args(connection, table_name, schema,
927
966
resolve_synonyms, dblink,
928
967
info_cache=info_cache)
931
SELECT a.index_name, a.column_name, b.uniqueness
932
FROM ALL_IND_COLUMNS%(dblink)s a,
933
ALL_INDEXES%(dblink)s b
935
a.index_name = b.index_name
936
AND a.table_owner = b.table_owner
937
AND a.table_name = b.table_name
939
AND a.table_name = :table_name
940
AND a.table_owner = :schema
941
ORDER BY a.index_name, a.column_position""" % {'dblink': dblink})
942
rp = connection.execute(q, table_name=self.denormalize_name(table_name),
943
schema=self.denormalize_name(schema))
970
params = {'table_name': table_name}
972
"SELECT a.index_name, a.column_name, b.uniqueness "\
973
"\nFROM ALL_IND_COLUMNS%(dblink)s a, "\
974
"\nALL_INDEXES%(dblink)s b "\
976
"\na.index_name = b.index_name "\
977
"\nAND a.table_owner = b.table_owner "\
978
"\nAND a.table_name = b.table_name "\
979
"\nAND a.table_name = :table_name "
981
if schema is not None:
982
params['schema'] = schema
983
text += "AND a.table_owner = :schema "
985
text += "ORDER BY a.index_name, a.column_position"
987
text = text % {'dblink': dblink}
990
rp = connection.execute(q, **params)
945
992
last_index_name = None
946
pkeys = self.get_primary_keys(connection, table_name, schema,
947
resolve_synonyms=resolve_synonyms,
949
info_cache=kw.get('info_cache'))
993
pk_constraint = self.get_pk_constraint(
994
connection, table_name, schema, resolve_synonyms=resolve_synonyms,
995
dblink=dblink, info_cache=kw.get('info_cache'))
996
pkeys = pk_constraint['constrained_columns']
950
997
uniqueness = dict(NONUNIQUE=False, UNIQUE=True)
952
999
oracle_sys_col = re.compile(r'SYS_NC\d+\$', re.IGNORECASE)
982
1029
def _get_constraint_data(self, connection, table_name, schema=None,
983
1030
dblink='', **kw):
985
rp = connection.execute(
989
loc.column_name AS local_column,
990
rem.table_name AS remote_table,
991
rem.column_name AS remote_column,
992
rem.owner AS remote_owner,
993
loc.position as loc_pos,
994
rem.position as rem_pos
995
FROM all_constraints%(dblink)s ac,
996
all_cons_columns%(dblink)s loc,
997
all_cons_columns%(dblink)s rem
998
WHERE ac.table_name = :table_name
999
AND ac.constraint_type IN ('R','P')
1000
AND ac.owner = :owner
1001
AND ac.owner = loc.owner
1002
AND ac.constraint_name = loc.constraint_name
1003
AND ac.r_owner = rem.owner(+)
1004
AND ac.r_constraint_name = rem.constraint_name(+)
1005
AND (rem.position IS NULL or loc.position=rem.position)
1006
ORDER BY ac.constraint_name, loc.position""" % {'dblink': dblink}),
1007
table_name=table_name, owner=schema)
1032
params = {'table_name': table_name}
1036
"\nac.constraint_name,"\
1037
"\nac.constraint_type,"\
1038
"\nloc.column_name AS local_column,"\
1039
"\nrem.table_name AS remote_table,"\
1040
"\nrem.column_name AS remote_column,"\
1041
"\nrem.owner AS remote_owner,"\
1042
"\nloc.position as loc_pos,"\
1043
"\nrem.position as rem_pos"\
1044
"\nFROM all_constraints%(dblink)s ac,"\
1045
"\nall_cons_columns%(dblink)s loc,"\
1046
"\nall_cons_columns%(dblink)s rem"\
1047
"\nWHERE ac.table_name = :table_name"\
1048
"\nAND ac.constraint_type IN ('R','P')"
1050
if schema is not None:
1051
params['owner'] = schema
1052
text += "\nAND ac.owner = :owner"
1055
"\nAND ac.owner = loc.owner"\
1056
"\nAND ac.constraint_name = loc.constraint_name"\
1057
"\nAND ac.r_owner = rem.owner(+)"\
1058
"\nAND ac.r_constraint_name = rem.constraint_name(+)"\
1059
"\nAND (rem.position IS NULL or loc.position=rem.position)"\
1060
"\nORDER BY ac.constraint_name, loc.position"
1062
text = text % {'dblink': dblink}
1063
rp = connection.execute(sql.text(text), **params)
1008
1064
constraint_data = rp.fetchall()
1009
1065
return constraint_data
1011
def get_primary_keys(self, connection, table_name, schema=None, **kw):
1014
kw arguments can be:
1016
oracle_resolve_synonyms
1021
return self._get_primary_keys(connection, table_name, schema, **kw)[0]
1023
1067
@reflection.cache
1024
def _get_primary_keys(self, connection, table_name, schema=None, **kw):
1068
def get_pk_constraint(self, connection, table_name, schema=None, **kw):
1025
1069
resolve_synonyms = kw.get('oracle_resolve_synonyms', False)
1026
1070
dblink = kw.get('dblink', '')
1027
1071
info_cache = kw.get('info_cache')
1037
1081
info_cache=kw.get('info_cache'))
1039
1083
for row in constraint_data:
1041
1084
(cons_name, cons_type, local_column, remote_table, remote_column, remote_owner) = \
1042
1085
row[0:2] + tuple([self.normalize_name(x) for x in row[2:6]])
1043
1086
if cons_type == 'P':
1044
1087
if constraint_name is None:
1045
1088
constraint_name = self.normalize_name(cons_name)
1046
1089
pkeys.append(local_column)
1047
return pkeys, constraint_name
1049
def get_pk_constraint(self, connection, table_name, schema=None, **kw):
1050
cols, name = self._get_primary_keys(connection, table_name, schema=schema, **kw)
1053
'constrained_columns':cols,
1090
return {'constrained_columns': pkeys, 'name': constraint_name}
1057
1092
@reflection.cache
1058
1093
def get_foreign_keys(self, connection, table_name, schema=None, **kw):
1138
1173
self._prepare_reflection_args(connection, view_name, schema,
1139
1174
resolve_synonyms, dblink,
1140
1175
info_cache=info_cache)
1142
SELECT text FROM all_views
1143
WHERE owner = :schema
1144
AND view_name = :view_name
1146
rp = connection.execute(s,
1147
view_name=view_name, schema=schema).scalar()
1177
params = {'view_name': view_name}
1178
text = "SELECT text FROM all_views WHERE view_name=:view_name"
1180
if schema is not None:
1181
text += " AND owner = :schema"
1182
params['schema'] = schema
1184
rp = connection.execute(sql.text(text), **params).scalar()
1149
1186
return rp.decode(self.encoding)
1155
1191
class _OuterJoinColumn(sql.ClauseElement):
1156
1192
__visit_name__ = 'outer_join_column'
1158
1194
def __init__(self, column):
1159
1195
self.column = column