82
86
sql_transaction(Host, F);
83
87
%% SQL transaction, based on a erlang anonymous function (F = fun)
84
sql_transaction(Host, F) ->
85
gen_server:call(ejabberd_odbc_sup:get_random_pid(Host),
86
{sql_transaction, F}, 60000).
88
sql_transaction(Host, F) when is_function(F) ->
89
sql_call(Host, {sql_transaction, F}).
91
%% SQL bloc, based on a erlang anonymous function (F = fun)
93
sql_call(Host, {sql_bloc, F}).
95
sql_call(Host, Msg) ->
96
case get(?STATE_KEY) of
98
gen_server:call(ejabberd_odbc_sup:get_random_pid(Host),
99
{sql_cmd, Msg}, ?TRANSACTION_TIMEOUT);
88
105
%% This function is intended to be used from inside an sql_transaction:
89
106
sql_query_t(Query) ->
90
State = get(?STATE_KEY),
91
QRes = sql_query_internal(State, Query),
107
QRes = sql_query_internal(Query),
93
{error, "No SQL-driver information available."} ->
94
% workaround for odbc bug
110
throw({aborted, Reason});
98
111
Rs when is_list(Rs) ->
99
case lists:keymember(error, 1, Rs) of
112
case lists:keysearch(error, 1, Rs) of
113
{value, {error, Reason}} ->
114
throw({aborted, Reason});
134
147
init([Host, StartInterval]) ->
135
148
case ejabberd_config:get_local_option({odbc_keepalive_interval, Host}) of
136
149
KeepaliveInterval when is_integer(KeepaliveInterval) ->
137
timer:apply_interval(KeepaliveInterval*1000, ?MODULE, keep_alive, [self()]);
150
timer:apply_interval(KeepaliveInterval*1000, ?MODULE,
151
keep_alive, [self()]);
141
?ERROR_MSG("Wrong odbc_keepalive_interval definition '~p' for host ~p.~n", [_Other, Host])
155
?ERROR_MSG("Wrong odbc_keepalive_interval definition '~p'"
156
" for host ~p.~n", [_Other, Host])
143
158
SQLServer = ejabberd_config:get_local_option({odbc_server, Host}),
144
159
case SQLServer of
145
160
%% Default pgsql port
146
161
{pgsql, Server, DB, Username, Password} ->
147
pgsql_connect(Server, ?PGSQL_PORT, DB, Username, Password, StartInterval);
162
pgsql_connect(Server, ?PGSQL_PORT, DB, Username, Password,
148
164
{pgsql, Server, Port, DB, Username, Password} when is_integer(Port) ->
149
pgsql_connect(Server, Port, DB, Username, Password, StartInterval);
165
pgsql_connect(Server, Port, DB, Username, Password,
150
167
%% Default mysql port
151
168
{mysql, Server, DB, Username, Password} ->
152
mysql_connect(Server, ?MYSQL_PORT, DB, Username, Password, StartInterval);
169
mysql_connect(Server, ?MYSQL_PORT, DB, Username, Password,
153
171
{mysql, Server, Port, DB, Username, Password} when is_integer(Port) ->
154
mysql_connect(Server, Port, DB, Username, Password, StartInterval);
172
mysql_connect(Server, Port, DB, Username, Password,
155
174
_ when is_list(SQLServer) ->
156
175
odbc_connect(SQLServer, StartInterval)
165
184
%% {stop, Reason, Reply, State} | (terminate/2 is called)
166
185
%% {stop, Reason, State} (terminate/2 is called)
167
186
%%----------------------------------------------------------------------
168
handle_call({sql_query, Query}, _From, State) ->
169
Reply = sql_query_internal(State, Query),
170
{reply, Reply, State};
172
handle_call({sql_transaction, F}, _From, State) ->
173
Reply = execute_transaction(State, F, ?MAX_TRANSACTION_RESTARTS),
174
{reply, Reply, State};
176
handle_call(_Request, _From, State) ->
178
{reply, Reply, State}.
187
handle_call({sql_cmd, Command}, _From, State) ->
188
put(?NESTING_KEY, ?TOP_LEVEL_TXN),
189
put(?STATE_KEY, State),
190
abort_on_driver_error(outer_op(Command));
191
handle_call(Request, {Who, _Ref}, State) ->
192
?WARNING_MSG("Unexpected call ~p from ~p.", [Request, Who]),
180
195
%%----------------------------------------------------------------------
181
196
%% Func: handle_cast/2
209
224
%% Purpose: Shutdown the server
210
225
%% Returns: any (ignored by gen_server)
211
226
%%----------------------------------------------------------------------
212
terminate(_Reason, _State) ->
227
terminate(_Reason, State) ->
228
case State#state.db_type of
230
% old versions of mysql driver don't have the stop function
232
catch mysql_conn:stop(State#state.db_ref);
215
238
%%%----------------------------------------------------------------------
216
239
%%% Internal functions
217
240
%%%----------------------------------------------------------------------
218
sql_query_internal(State, Query) ->
219
case State#state.db_type of
221
odbc:sql_query(State#state.db_ref, Query);
223
pgsql_to_odbc(pgsql:squery(State#state.db_ref, Query));
225
mysql_to_odbc(mysql_conn:fetch(State#state.db_ref, Query, self()))
228
execute_transaction(_State, _F, 0) ->
229
{aborted, restarts_exceeded};
230
execute_transaction(State, F, NRestarts) ->
231
put(?STATE_KEY, State),
232
sql_query_internal(State, "begin;"),
242
%% Only called by handle_call, only handles top level operations.
243
%% @spec outer_op(Op) -> {error, Reason} | {aborted, Reason} | {atomic, Result}
244
outer_op({sql_query, Query}) ->
245
sql_query_internal(Query);
246
outer_op({sql_transaction, F}) ->
247
outer_transaction(F, ?MAX_TRANSACTION_RESTARTS, "");
248
outer_op({sql_bloc, F}) ->
251
%% Called via sql_query/transaction/bloc from client code when inside a
253
nested_op({sql_query, Query}) ->
254
%% XXX - use sql_query_t here insted? Most likely would break
255
%% callers who expect {error, _} tuples (sql_query_t turns
256
%% these into throws)
257
sql_query_internal(Query);
258
nested_op({sql_transaction, F}) ->
259
NestingLevel = get(?NESTING_KEY),
260
if NestingLevel =:= ?TOP_LEVEL_TXN ->
261
%% First transaction inside a (series of) sql_blocs
262
outer_transaction(F, ?MAX_TRANSACTION_RESTARTS, "");
264
%% Transaction inside a transaction
267
nested_op({sql_bloc, F}) ->
270
%% Never retry nested transactions - only outer transactions
271
inner_transaction(F) ->
272
PreviousNestingLevel = get(?NESTING_KEY),
273
case get(?NESTING_KEY) of
275
{backtrace, T} = process_info(self(), backtrace),
276
?ERROR_MSG("inner transaction called at outer txn level. Trace: ~s",
278
erlang:exit(implementation_faulty);
281
put(?NESTING_KEY, PreviousNestingLevel + 1),
282
Result = (catch F()),
283
put(?NESTING_KEY, PreviousNestingLevel),
295
outer_transaction(F, NRestarts, _Reason) ->
296
PreviousNestingLevel = get(?NESTING_KEY),
297
case get(?NESTING_KEY) of
301
{backtrace, T} = process_info(self(), backtrace),
302
?ERROR_MSG("outer transaction called at inner txn level. Trace: ~s",
304
erlang:exit(implementation_faulty)
306
sql_query_internal("begin;"),
307
put(?NESTING_KEY, PreviousNestingLevel + 1),
308
Result = (catch F()),
309
put(?NESTING_KEY, PreviousNestingLevel),
311
{aborted, Reason} when NRestarts > 0 ->
312
%% Retry outer transaction upto NRestarts times.
313
sql_query_internal("rollback;"),
314
outer_transaction(F, NRestarts - 1, Reason);
315
{aborted, Reason} when NRestarts =:= 0 ->
316
%% Too many retries of outer transaction.
317
?ERROR_MSG("SQL transaction restarts exceeded~n"
319
"** Last abort reason: ~p~n"
320
"** Stacktrace: ~p~n"
321
"** When State == ~p",
322
[?MAX_TRANSACTION_RESTARTS, Reason,
323
erlang:get_stacktrace(), get(?STATE_KEY)]),
324
sql_query_internal("rollback;"),
327
%% Abort sql transaction on EXIT from outer txn only.
328
sql_query_internal("rollback;"),
331
%% Commit successful outer txn
332
sql_query_internal("commit;"),
337
%% We don't alter ?NESTING_KEY here as only SQL transactions alter
233
339
case catch F() of
235
execute_transaction(State, F, NRestarts - 1);
237
sql_query_internal(State, "rollback;"),
240
sql_query_internal(State, "commit;"),
348
sql_query_internal(Query) ->
349
State = get(?STATE_KEY),
350
Res = case State#state.db_type of
352
odbc:sql_query(State#state.db_ref, Query);
354
pgsql_to_odbc(pgsql:squery(State#state.db_ref, Query));
356
?DEBUG("MySQL, Send query~n~p~n", [Query]),
357
R = mysql_to_odbc(mysql_conn:fetch(State#state.db_ref,
359
%% ?INFO_MSG("MySQL, Received result~n~p~n", [R]),
363
{error, "No SQL-driver information available."} ->
364
% workaround for odbc bug
369
%% Generate the OTP callback return tuple depending on the driver result.
370
abort_on_driver_error({error, "query timed out"} = Reply) ->
371
%% mysql driver error
372
{stop, timeout, Reply, get(?STATE_KEY)};
373
abort_on_driver_error({error, "Failed sending data on socket"++_} = Reply) ->
374
%% mysql driver error
375
{stop, closed, Reply, get(?STATE_KEY)};
376
abort_on_driver_error(Reply) ->
377
{reply, Reply, get(?STATE_KEY)}.
244
380
%% == pure ODBC code