119
120
case get(?STATE_KEY) of
121
122
?GEN_FSM:sync_send_event(ejabberd_odbc_sup:get_random_pid(Host),
122
{sql_cmd, Msg}, ?TRANSACTION_TIMEOUT);
123
{sql_cmd, Msg, now()}, ?TRANSACTION_TIMEOUT);
127
128
% perform a harmless query on all opened connexions to avoid connexion close.
128
129
keep_alive(PID) ->
129
?GEN_FSM:sync_send_event(PID, {sql_cmd, {sql_query, ?KEEPALIVE_QUERY}},
130
?GEN_FSM:sync_send_event(PID, {sql_cmd, {sql_query, ?KEEPALIVE_QUERY}, now()},
130
131
?KEEPALIVE_TIMEOUT).
132
133
%% This function is intended to be used from inside an sql_transaction:
149
150
%% Escape character that will confuse an SQL engine
150
151
escape(S) when is_list(S) ->
151
[odbc_queries:escape(C) || C <- S].
152
[odbc_queries:escape(C) || C <- S];
153
escape(S) when is_binary(S) ->
154
escape(binary_to_list(S)).
153
156
%% Escape character that will confuse an SQL engine
154
157
%% Percent and underscore only need to be escaped for pattern matching like
177
186
[DBType | _] = db_opts(Host),
178
187
?GEN_FSM:send_event(self(), connect),
188
ejabberd_odbc_sup:add_pid(Host, self()),
179
189
{ok, connecting, #state{db_type = DBType,
181
191
max_pending_requests_len = max_fsm_queue(),
216
226
?WARNING_MSG("unexpected event in 'connecting': ~p", [Event]),
217
227
{next_state, connecting, State}.
219
connecting({sql_cmd, {sql_query, ?KEEPALIVE_QUERY}}, From, State) ->
229
connecting({sql_cmd, {sql_query, ?KEEPALIVE_QUERY}, _Timestamp}, From, State) ->
220
230
?GEN_FSM:reply(From, {error, "SQL connection failed"}),
221
231
{next_state, connecting, State};
222
connecting({sql_cmd, Command} = Req, From, State) ->
223
?DEBUG("queueing pending request while connecting:~n\t~p", [Req]),
232
connecting({sql_cmd, Command, Timestamp} = Req, From, State) ->
233
?DEBUG("queuing pending request while connecting:~n\t~p", [Req]),
224
234
{Len, PendingRequests} = State#state.pending_requests,
225
235
NewPendingRequests =
226
236
if Len < State#state.max_pending_requests_len ->
227
{Len + 1, queue:in({sql_cmd, Command, From}, PendingRequests)};
237
{Len + 1, queue:in({sql_cmd, Command, From, Timestamp}, PendingRequests)};
230
fun({sql_cmd, _, To}) ->
240
fun({sql_cmd, _, To, _Timestamp}) ->
232
242
To, {error, "SQL connection failed"})
233
243
end, queue:to_list(PendingRequests)),
234
{1, queue:from_list([{sql_cmd, Command, From}])}
244
{1, queue:from_list([{sql_cmd, Command, From, Timestamp}])}
236
246
{next_state, connecting,
237
247
State#state{pending_requests = NewPendingRequests}};
241
251
{reply, {error, badarg}, connecting, State}.
243
session_established({sql_cmd, Command}, From, State) ->
244
put(?NESTING_KEY, ?TOP_LEVEL_TXN),
245
put(?STATE_KEY, State),
246
abort_on_driver_error(outer_op(Command), From);
253
session_established({sql_cmd, Command, Timestamp}, From, State) ->
254
run_sql_cmd(Command, From, State, Timestamp);
247
255
session_established(Request, {Who, _Ref}, State) ->
248
256
?WARNING_MSG("unexpected call ~p from ~p in 'session_established'",
250
258
{reply, {error, badarg}, session_established, State}.
252
session_established({sql_cmd, Command, From}, State) ->
253
put(?NESTING_KEY, ?TOP_LEVEL_TXN),
254
put(?STATE_KEY, State),
255
abort_on_driver_error(outer_op(Command), From);
260
session_established({sql_cmd, Command, From, Timestamp}, State) ->
261
run_sql_cmd(Command, From, State, Timestamp);
256
262
session_established(Event, State) ->
257
263
?WARNING_MSG("unexpected event in 'session_established': ~p", [Event]),
258
264
{next_state, session_established, State}.
276
282
{next_state, StateName, State}.
278
284
terminate(_Reason, _StateName, State) ->
285
ejabberd_odbc_sup:remove_pid(State#state.host, self()),
279
286
case State#state.db_type of
281
288
%% old versions of mysql driver don't have the stop function
297
304
%%% Internal functions
298
305
%%%----------------------------------------------------------------------
307
run_sql_cmd(Command, From, State, Timestamp) ->
308
case timer:now_diff(now(), Timestamp) div 1000 of
309
Age when Age < ?TRANSACTION_TIMEOUT ->
310
put(?NESTING_KEY, ?TOP_LEVEL_TXN),
311
put(?STATE_KEY, State),
312
abort_on_driver_error(outer_op(Command), From);
314
?ERROR_MSG("Database was not available or too slow,"
315
" discarding ~p milliseconds old request~n~p~n",
317
{next_state, session_established, State}
300
320
%% Only called by handle_call, only handles top level operations.
301
321
%% @spec outer_op(Op) -> {error, Reason} | {aborted, Reason} | {atomic, Result}
302
322
outer_op({sql_query, Query}) ->