~ubuntu-branches/ubuntu/trusty/ejabberd/trusty-proposed

« back to all changes in this revision

Viewing changes to src/odbc/ejabberd_odbc.erl

  • Committer: Bazaar Package Importer
  • Author(s): Gerfried Fuchs, Konstantin Khomoutov, Gerfried Fuchs
  • Date: 2009-12-04 18:22:49 UTC
  • mfrom: (1.1.11 upstream)
  • Revision ID: james.westby@ubuntu.com-20091204182249-6jfmdz8878h7oaos
Tags: 2.1.0-1
[ Konstantin Khomoutov ]
* New upstream release (Closes: #519858).
  This also adds support for LDAPS upstream (Closes: #526145).
* Do not depend on cdbs anymore, port debian/rules to dh+quilt,
  remove build dependency on patchutils, use erlang-depends.
* Bump debhelper version to 7, standards base to 3.8.3
* Depend on erlang R13B.
* Recommend imagemagick (for captcha support).
* Remove deprecated patches (ssl.patch patch, dynamic_compile_loglevel.patch,
  ldaps.patch, update.patch, proxy.patch, caps.patch, convert.patch,
  s2s.patch).
* Replace mod_ctlextra with mod_admin_extra.
* Use upstream inetrc file.
* Bring debian/ejabberd.cfg and ejabberdctl in sync with upstream.
* Update ejabberdctl manual page.
* Provide NEWS file.
* Rework README.Debian:
  * Group all information into sections.
  * Describe issues with epam binary (Closes: #502791).
  * Discuss how to use DBMS backends (Closes: #540915, #507144).
  * Discuss upgrading from 2.0.x series.
* Implement PID file management (Closes: #519858).
* Make logrotate process all files matching "*.log".
* Improve init script:
  * Make init script LSB-compliant.
  * Implement "live" target which allows to run ejabberd in foreground.
* Make captcha.sh use bash explicitly.
* Rework node-generation for ejabberdctl to fix ejabberd's atom table
  overflows while preserving the possibility to run several versions
  of ejabberdctl concurrently as before.
* Add webadmin patch restoring compatibility with Erlang/OTP <= R12B-4.
* Integrate upstream patch for EJAB-1106.
* Add upstream patch for EJAB-1098.
* Add upstream patch for EJAB-1045.
* Add Konstantin Khomoutov to uploaders.
* Add Japanese debconf translation (thanks to Hideki Yamane)
  (Closes: #558071).

[ Gerfried Fuchs ]
* Build-Depend on po-debconf so po2debconf can be called.

Show diffs side-by-side

added added

removed removed

Lines of Context:
34
34
         sql_query/2,
35
35
         sql_query_t/1,
36
36
         sql_transaction/2,
 
37
         sql_bloc/2,
37
38
         escape/1,
38
39
         escape_like/1,
39
40
         keep_alive/1]).
51
52
-record(state, {db_ref, db_type}).
52
53
 
53
54
-define(STATE_KEY, ejabberd_odbc_state).
 
55
-define(NESTING_KEY, ejabberd_odbc_nesting_level).
 
56
-define(TOP_LEVEL_TXN, 0).
54
57
-define(MAX_TRANSACTION_RESTARTS, 10).
55
58
-define(PGSQL_PORT, 5432).
56
59
-define(MYSQL_PORT, 3306).
57
60
 
 
61
-define(TRANSACTION_TIMEOUT, 60000). % milliseconds
 
62
-define(KEEPALIVE_TIMEOUT, 60000).
58
63
-define(KEEPALIVE_QUERY, "SELECT 1;").
59
64
 
60
65
%%%----------------------------------------------------------------------
67
72
    gen_server:start_link(ejabberd_odbc, [Host, StartInterval], []).
68
73
 
69
74
sql_query(Host, Query) ->
70
 
    gen_server:call(ejabberd_odbc_sup:get_random_pid(Host),
71
 
                    {sql_query, Query}, 60000).
 
75
    sql_call(Host, {sql_query, Query}).
72
76
 
73
77
%% SQL transaction based on a list of queries
74
78
%% This function automatically
81
85
        end,
82
86
    sql_transaction(Host, F);
83
87
%% SQL transaction, based on a erlang anonymous function (F = fun)
84
 
sql_transaction(Host, F) ->
85
 
    gen_server:call(ejabberd_odbc_sup:get_random_pid(Host),
86
 
                    {sql_transaction, F}, 60000).
 
88
sql_transaction(Host, F) when is_function(F) ->
 
89
    sql_call(Host, {sql_transaction, F}).
 
90
 
 
91
%% SQL bloc, based on a erlang anonymous function (F = fun)
 
92
sql_bloc(Host, F) ->
 
93
    sql_call(Host, {sql_bloc, F}).
 
94
 
 
95
sql_call(Host, Msg) ->
 
96
    case get(?STATE_KEY) of
 
97
        undefined ->
 
98
            gen_server:call(ejabberd_odbc_sup:get_random_pid(Host),
 
99
                            {sql_cmd, Msg}, ?TRANSACTION_TIMEOUT);
 
100
        _State ->
 
101
            nested_op(Msg)
 
102
    end.
 
103
 
87
104
 
88
105
%% This function is intended to be used from inside an sql_transaction:
89
106
sql_query_t(Query) ->
90
 
    State = get(?STATE_KEY),
91
 
    QRes = sql_query_internal(State, Query),
 
107
    QRes = sql_query_internal(Query),
92
108
    case QRes of
93
 
        {error, "No SQL-driver information available."} ->
94
 
            % workaround for odbc bug
95
 
            {updated, 0};
96
 
        {error, _} ->
97
 
            throw(aborted);
 
109
        {error, Reason} ->
 
110
            throw({aborted, Reason});
98
111
        Rs when is_list(Rs) ->
99
 
            case lists:keymember(error, 1, Rs) of
100
 
                true ->
101
 
                    throw(aborted);
 
112
            case lists:keysearch(error, 1, Rs) of
 
113
                {value, {error, Reason}} ->
 
114
                    throw({aborted, Reason});
102
115
                _ ->
103
116
                    QRes
104
117
            end;
134
147
init([Host, StartInterval]) ->
135
148
    case ejabberd_config:get_local_option({odbc_keepalive_interval, Host}) of
136
149
        KeepaliveInterval when is_integer(KeepaliveInterval) ->
137
 
            timer:apply_interval(KeepaliveInterval*1000, ?MODULE, keep_alive, [self()]);
 
150
            timer:apply_interval(KeepaliveInterval*1000, ?MODULE,
 
151
                                 keep_alive, [self()]);
138
152
        undefined ->
139
153
            ok;
140
154
        _Other ->
141
 
            ?ERROR_MSG("Wrong odbc_keepalive_interval definition '~p' for host ~p.~n", [_Other, Host])
 
155
            ?ERROR_MSG("Wrong odbc_keepalive_interval definition '~p'"
 
156
                       " for host ~p.~n", [_Other, Host])
142
157
    end,
143
158
    SQLServer = ejabberd_config:get_local_option({odbc_server, Host}),
144
159
    case SQLServer of
145
160
        %% Default pgsql port
146
161
        {pgsql, Server, DB, Username, Password} ->
147
 
            pgsql_connect(Server, ?PGSQL_PORT, DB, Username, Password, StartInterval);
 
162
            pgsql_connect(Server, ?PGSQL_PORT, DB, Username, Password,
 
163
                          StartInterval);
148
164
        {pgsql, Server, Port, DB, Username, Password} when is_integer(Port) ->
149
 
            pgsql_connect(Server, Port, DB, Username, Password, StartInterval);
 
165
            pgsql_connect(Server, Port, DB, Username, Password,
 
166
                          StartInterval);
150
167
        %% Default mysql port
151
168
        {mysql, Server, DB, Username, Password} ->
152
 
            mysql_connect(Server, ?MYSQL_PORT, DB, Username, Password, StartInterval);
 
169
            mysql_connect(Server, ?MYSQL_PORT, DB, Username, Password,
 
170
                          StartInterval);
153
171
        {mysql, Server, Port, DB, Username, Password} when is_integer(Port) ->
154
 
            mysql_connect(Server, Port, DB, Username, Password, StartInterval);
 
172
            mysql_connect(Server, Port, DB, Username, Password,
 
173
                          StartInterval);
155
174
        _ when is_list(SQLServer) ->
156
175
            odbc_connect(SQLServer, StartInterval)
157
176
    end.
165
184
%%          {stop, Reason, Reply, State}   | (terminate/2 is called)
166
185
%%          {stop, Reason, State}            (terminate/2 is called)
167
186
%%----------------------------------------------------------------------
168
 
handle_call({sql_query, Query}, _From, State) ->
169
 
    Reply = sql_query_internal(State, Query),
170
 
    {reply, Reply, State};
171
 
 
172
 
handle_call({sql_transaction, F}, _From, State) ->
173
 
    Reply = execute_transaction(State, F, ?MAX_TRANSACTION_RESTARTS),
174
 
    {reply, Reply, State};
175
 
 
176
 
handle_call(_Request, _From, State) ->
177
 
    Reply = ok,
178
 
    {reply, Reply, State}.
 
187
handle_call({sql_cmd, Command}, _From, State) ->
 
188
    put(?NESTING_KEY, ?TOP_LEVEL_TXN),
 
189
    put(?STATE_KEY, State),
 
190
    abort_on_driver_error(outer_op(Command));
 
191
handle_call(Request, {Who, _Ref}, State) ->
 
192
    ?WARNING_MSG("Unexpected call ~p from ~p.", [Request, Who]),
 
193
    {reply, ok, State}.
179
194
 
180
195
%%----------------------------------------------------------------------
181
196
%% Func: handle_cast/2
209
224
%% Purpose: Shutdown the server
210
225
%% Returns: any (ignored by gen_server)
211
226
%%----------------------------------------------------------------------
212
 
terminate(_Reason, _State) ->
 
227
terminate(_Reason, State) ->
 
228
    case State#state.db_type of
 
229
        mysql ->
 
230
            % old versions of mysql driver don't have the stop function
 
231
            % so the catch
 
232
            catch mysql_conn:stop(State#state.db_ref);
 
233
        _ ->
 
234
            ok
 
235
    end,
213
236
    ok.
214
237
 
215
238
%%%----------------------------------------------------------------------
216
239
%%% Internal functions
217
240
%%%----------------------------------------------------------------------
218
 
sql_query_internal(State, Query) ->
219
 
    case State#state.db_type of
220
 
        odbc ->
221
 
            odbc:sql_query(State#state.db_ref, Query);
222
 
        pgsql ->
223
 
            pgsql_to_odbc(pgsql:squery(State#state.db_ref, Query));
224
 
        mysql ->
225
 
            mysql_to_odbc(mysql_conn:fetch(State#state.db_ref, Query, self()))
226
 
    end.
227
 
 
228
 
execute_transaction(_State, _F, 0) ->
229
 
    {aborted, restarts_exceeded};
230
 
execute_transaction(State, F, NRestarts) ->
231
 
    put(?STATE_KEY, State),
232
 
    sql_query_internal(State, "begin;"),
 
241
 
 
242
%% Only called by handle_call, only handles top level operations.
 
243
%% @spec outer_op(Op) -> {error, Reason} | {aborted, Reason} | {atomic, Result}
 
244
outer_op({sql_query, Query}) ->
 
245
    sql_query_internal(Query);
 
246
outer_op({sql_transaction, F}) ->
 
247
    outer_transaction(F, ?MAX_TRANSACTION_RESTARTS, "");
 
248
outer_op({sql_bloc, F}) ->
 
249
    execute_bloc(F).
 
250
 
 
251
%% Called via sql_query/transaction/bloc from client code when inside a
 
252
%% nested operation
 
253
nested_op({sql_query, Query}) ->
 
254
    %% XXX - use sql_query_t here insted? Most likely would break
 
255
    %% callers who expect {error, _} tuples (sql_query_t turns
 
256
    %% these into throws)
 
257
    sql_query_internal(Query);
 
258
nested_op({sql_transaction, F}) ->
 
259
    NestingLevel = get(?NESTING_KEY),
 
260
    if NestingLevel =:= ?TOP_LEVEL_TXN ->
 
261
            %% First transaction inside a (series of) sql_blocs
 
262
            outer_transaction(F, ?MAX_TRANSACTION_RESTARTS, "");
 
263
       true ->
 
264
            %% Transaction inside a transaction
 
265
            inner_transaction(F)
 
266
    end;
 
267
nested_op({sql_bloc, F}) ->
 
268
    execute_bloc(F).
 
269
 
 
270
%% Never retry nested transactions - only outer transactions
 
271
inner_transaction(F) ->
 
272
    PreviousNestingLevel = get(?NESTING_KEY),
 
273
    case get(?NESTING_KEY) of
 
274
        ?TOP_LEVEL_TXN ->
 
275
            {backtrace, T} = process_info(self(), backtrace),
 
276
            ?ERROR_MSG("inner transaction called at outer txn level. Trace: ~s",
 
277
                       [T]),
 
278
            erlang:exit(implementation_faulty);
 
279
        _N -> ok
 
280
    end,
 
281
    put(?NESTING_KEY, PreviousNestingLevel + 1),
 
282
    Result = (catch F()),
 
283
    put(?NESTING_KEY, PreviousNestingLevel),
 
284
    case Result of
 
285
        {aborted, Reason} ->
 
286
            {aborted, Reason};
 
287
        {'EXIT', Reason} ->
 
288
            {'EXIT', Reason};
 
289
        {atomic, Res} ->
 
290
            {atomic, Res};
 
291
        Res ->
 
292
            {atomic, Res}
 
293
    end.
 
294
 
 
295
outer_transaction(F, NRestarts, _Reason) ->
 
296
    PreviousNestingLevel = get(?NESTING_KEY),
 
297
    case get(?NESTING_KEY) of
 
298
        ?TOP_LEVEL_TXN ->
 
299
            ok;
 
300
        _N ->
 
301
            {backtrace, T} = process_info(self(), backtrace),
 
302
            ?ERROR_MSG("outer transaction called at inner txn level. Trace: ~s",
 
303
                       [T]),
 
304
            erlang:exit(implementation_faulty)
 
305
    end,
 
306
    sql_query_internal("begin;"),
 
307
    put(?NESTING_KEY, PreviousNestingLevel + 1),
 
308
    Result = (catch F()),
 
309
    put(?NESTING_KEY, PreviousNestingLevel),
 
310
    case Result of
 
311
        {aborted, Reason} when NRestarts > 0 ->
 
312
            %% Retry outer transaction upto NRestarts times.
 
313
            sql_query_internal("rollback;"),
 
314
            outer_transaction(F, NRestarts - 1, Reason);
 
315
        {aborted, Reason} when NRestarts =:= 0 ->
 
316
            %% Too many retries of outer transaction.
 
317
            ?ERROR_MSG("SQL transaction restarts exceeded~n"
 
318
                       "** Restarts: ~p~n"
 
319
                       "** Last abort reason: ~p~n"
 
320
                       "** Stacktrace: ~p~n"
 
321
                       "** When State == ~p",
 
322
                       [?MAX_TRANSACTION_RESTARTS, Reason,
 
323
                        erlang:get_stacktrace(), get(?STATE_KEY)]),
 
324
            sql_query_internal("rollback;"),
 
325
            {aborted, Reason};
 
326
        {'EXIT', Reason} ->
 
327
            %% Abort sql transaction on EXIT from outer txn only.
 
328
            sql_query_internal("rollback;"),
 
329
            {aborted, Reason};
 
330
        Res ->
 
331
            %% Commit successful outer txn
 
332
            sql_query_internal("commit;"),
 
333
            {atomic, Res}
 
334
    end.
 
335
 
 
336
execute_bloc(F) ->
 
337
    %% We don't alter ?NESTING_KEY here as only SQL transactions alter
 
338
    %% txn nesting
233
339
    case catch F() of
234
 
        aborted ->
235
 
            execute_transaction(State, F, NRestarts - 1);
236
 
        {'EXIT', Reason} ->
237
 
            sql_query_internal(State, "rollback;"),
238
 
            {aborted, Reason};
239
 
        Res ->
240
 
            sql_query_internal(State, "commit;"),
241
 
            {atomic, Res}
242
 
    end.
 
340
        {aborted, Reason} ->
 
341
            {aborted, Reason};
 
342
        {'EXIT', Reason} ->
 
343
            {aborted, Reason};
 
344
        Res ->
 
345
            {atomic, Res}
 
346
    end.
 
347
 
 
348
sql_query_internal(Query) ->
 
349
    State = get(?STATE_KEY),
 
350
    Res = case State#state.db_type of
 
351
              odbc ->
 
352
                  odbc:sql_query(State#state.db_ref, Query);
 
353
              pgsql ->
 
354
                  pgsql_to_odbc(pgsql:squery(State#state.db_ref, Query));
 
355
              mysql ->
 
356
                  ?DEBUG("MySQL, Send query~n~p~n", [Query]),
 
357
                  R = mysql_to_odbc(mysql_conn:fetch(State#state.db_ref,
 
358
                                                     Query, self())),
 
359
                  %% ?INFO_MSG("MySQL, Received result~n~p~n", [R]),
 
360
                  R
 
361
          end,
 
362
    case Res of
 
363
        {error, "No SQL-driver information available."} ->
 
364
            % workaround for odbc bug
 
365
            {updated, 0};
 
366
        _Else -> Res
 
367
    end.
 
368
 
 
369
%% Generate the OTP callback return tuple depending on the driver result.
 
370
abort_on_driver_error({error, "query timed out"} = Reply) ->
 
371
    %% mysql driver error
 
372
    {stop, timeout, Reply, get(?STATE_KEY)};
 
373
abort_on_driver_error({error, "Failed sending data on socket"++_} = Reply) ->
 
374
    %% mysql driver error
 
375
    {stop, closed, Reply, get(?STATE_KEY)};
 
376
abort_on_driver_error(Reply) ->
 
377
    {reply, Reply, get(?STATE_KEY)}.
 
378
 
243
379
 
244
380
%% == pure ODBC code
245
381
 
294
430
    {updated, list_to_integer(N)};
295
431
pgsql_item_to_odbc("DELETE " ++ N) ->
296
432
    {updated, list_to_integer(N)};
 
433
pgsql_item_to_odbc("UPDATE " ++ N) ->
 
434
    {updated, list_to_integer(N)};
297
435
pgsql_item_to_odbc({error, Error}) ->
298
436
    {error, Error};
299
437
pgsql_item_to_odbc(_) ->
307
445
    case mysql_conn:start(Server, Port, Username, Password, DB, fun log/3) of
308
446
        {ok, Ref} ->
309
447
            erlang:monitor(process, Ref),
310
 
            mysql_conn:fetch(Ref, ["set names 'utf8';"], self()), 
 
448
            mysql_conn:fetch(Ref, ["set names 'utf8';"], self()),
311
449
            {ok, #state{db_ref = Ref, db_type = mysql}};
312
450
        {error, Reason} ->
313
 
            ?ERROR_MSG("MySQL connection failed: ~p~nWaiting ~p seconds before retrying...~n",
 
451
            ?ERROR_MSG("MySQL connection failed: ~p~n"
 
452
                       "Waiting ~p seconds before retrying...~n",
314
453
                       [Reason, StartInterval div 1000]),
315
454
            %% If we can't connect we wait before retrying
316
455
            timer:sleep(StartInterval),
338
477
 
339
478
% perform a harmless query on all opened connexions to avoid connexion close.
340
479
keep_alive(PID) ->
341
 
    gen_server:call(PID, {sql_query, ?KEEPALIVE_QUERY}, 60000).
 
480
    gen_server:call(PID, {sql_cmd, {sql_query, ?KEEPALIVE_QUERY}},
 
481
                    ?KEEPALIVE_TIMEOUT).
342
482
 
343
483
% log function used by MySQL driver
344
484
log(Level, Format, Args) ->