1
%% ``The contents of this file are subject to the Erlang Public License,
2
%% Version 1.1, (the "License"); you may not use this file except in
3
%% compliance with the License. You should have received a copy of the
4
%% Erlang Public License along with this software. If not, it can be
5
%% retrieved via the world wide web at http://www.erlang.org/.
7
%% Software distributed under the License is distributed on an "AS IS"
8
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
9
%% the License for the specific language governing rights and limitations
12
%% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
13
%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
14
%% AB. All Rights Reserved.''
16
%% $Id: mnesia_monitor.erl,v 1.1 2008/12/17 09:53:38 mikpe Exp $
18
-module(mnesia_monitor).
20
-behaviour(gen_server).
26
detect_inconcistency/2,
32
needs_protocol_conversion/1,
51
%% gen_server callbacks
65
detect_partitioned_network/2,
66
has_remote_mnesia_down/1
69
-import(mnesia_lib, [dbg_out/2, verbose/2, error/2, fatal/2, set/2]).
71
-include("mnesia.hrl").
73
-record(state, {supervisor, pending_negotiators = [],
74
going_down = [], tm_started = false, early_connects = []}).
76
-define(current_protocol_version, {7,6}).
78
-define(previous_protocol_version, {7,5}).
81
gen_server:start_link({local, ?MODULE}, ?MODULE,
82
[self()], [{timeout, infinity}
89
mnesia_down(From, Node) ->
90
cast({mnesia_down, From, Node}).
93
unsafe_call({mktab, Tab, Args}).
94
unsafe_mktab(Tab, Args) ->
95
unsafe_call({unsafe_mktab, Tab, Args}).
97
open_dets(Tab, Args) ->
98
unsafe_call({open_dets, Tab, Args}).
99
unsafe_open_dets(Tab, Args) ->
100
unsafe_call({unsafe_open_dets, Tab, Args}).
103
unsafe_call({close_dets, Tab}).
105
unsafe_close_dets(Name) ->
106
unsafe_call({unsafe_close_dets, Name}).
109
unsafe_call({open_log, Args}).
111
reopen_log(Name, Fname, Head) ->
112
unsafe_call({reopen_log, Name, Fname, Head}).
115
unsafe_call({close_log, Name}).
117
unsafe_close_log(Name) ->
118
unsafe_call({unsafe_close_log, Name}).
122
cast({disconnect, Node}).
124
%% Returns GoodNoodes
125
%% Creates a link to each compatible monitor and
126
%% protocol_version to agreed version upon success
128
negotiate_protocol(Nodes) ->
129
Version = mnesia:system_info(version),
130
Protocols = acceptable_protocol_versions(),
131
MonitorPid = whereis(?MODULE),
132
Msg = {negotiate_protocol, MonitorPid, Version, Protocols},
133
{Replies, _BadNodes} = multicall(Nodes, Msg),
134
check_protocol(Replies, Protocols).
136
check_protocol([{Node, {accept, Mon, _Version, Protocol}} | Tail], Protocols) ->
137
case lists:member(Protocol, Protocols) of
139
case Protocol == protocol_version() of
141
set({protocol, Node}, {Protocol, false});
143
set({protocol, Node}, {Protocol, true})
145
[node(Mon) | check_protocol(Tail, Protocols)];
147
unlink(Mon), % Get rid of unneccessary link
148
check_protocol(Tail, Protocols)
150
check_protocol([{Node, {reject, _Mon, Version, Protocol}} | Tail], Protocols) ->
151
verbose("Failed to connect with ~p. ~p protocols rejected. "
152
"expected version = ~p, expected protocol = ~p~n",
153
[Node, Protocols, Version, Protocol]),
154
check_protocol(Tail, Protocols);
155
check_protocol([{error, _Reason} | Tail], Protocols) ->
156
check_protocol(Tail, Protocols);
157
check_protocol([{badrpc, _Reason} | Tail], Protocols) ->
158
check_protocol(Tail, Protocols);
159
check_protocol([], [Protocol | _Protocols]) ->
160
set(protocol_version, Protocol),
162
check_protocol([], []) ->
163
set(protocol_version, protocol_version()),
166
protocol_version() ->
167
case ?catch_val(protocol_version) of
168
{'EXIT', _} -> ?current_protocol_version;
172
%% A sorted list of acceptable protocols the
173
%% preferred protocols are first in the list
174
acceptable_protocol_versions() ->
175
[protocol_version(), ?previous_protocol_version].
177
needs_protocol_conversion(Node) ->
178
case {?catch_val({protocol, Node}), protocol_version()} of
181
{{_, Bool}, ?current_protocol_version} ->
188
case whereis(?MODULE) of
190
Pid -> gen_server:cast(Pid, Msg)
194
case whereis(?MODULE) of
195
undefined -> {error, {node_not_running, node()}};
196
Pid -> gen_server:call(Pid, Msg, infinity)
200
case whereis(?MODULE) of
202
{error, {node_not_running, node()}};
205
Res = gen_server:call(Pid, Msg, infinity),
208
%% We get an exit signal if server dies
210
{'EXIT', Pid, _Reason} ->
211
{error, {node_not_running, node()}}
218
multicall(Nodes, Msg) ->
219
rpc:multicall(Nodes, ?MODULE, call, [Msg]).
221
start_proc(Who, Mod, Fun, Args) ->
222
Args2 = [Who, Mod, Fun, Args],
223
proc_lib:start_link(mnesia_sp, init_proc, Args2, infinity).
225
terminate_proc(Who, R, State) when R /= shutdown, R /= killed ->
226
fatal("~p crashed: ~p state: ~p~n", [Who, R, State]);
228
terminate_proc(Who, Reason, _State) ->
229
mnesia_lib:verbose("~p terminated: ~p~n", [Who, Reason]),
232
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
233
%%% Callback functions from gen_server
235
%%----------------------------------------------------------------------
237
%% Returns: {ok, State} |
238
%% {ok, State, Timeout} |
240
%%----------------------------------------------------------------------
242
process_flag(trap_exit, true),
243
?ets_new_table(mnesia_gvar, [set, public, named_table]),
244
set(subscribers, []),
245
mnesia_lib:verbose("~p starting: ~p~n", [?MODULE, self()]),
246
Version = mnesia:system_info(version),
247
set(version, Version),
248
dbg_out("Version: ~p~n", [Version]),
250
case catch process_config_args(env()) of
252
mnesia_lib:set({'$$$_report', current_pos}, 0),
253
Level = mnesia_lib:val(debug),
254
mnesia_lib:verbose("Mnesia debug level set to ~p\n", [Level]),
255
set(mnesia_status, starting), %% set start status
256
set({current, db_nodes}, [node()]),
257
set(use_dir, use_dir()),
258
mnesia_lib:create_counter(trans_aborts),
259
mnesia_lib:create_counter(trans_commits),
260
mnesia_lib:create_counter(trans_log_writes),
261
Left = get_env(dump_log_write_threshold),
262
mnesia_lib:set_counter(trans_log_writes_left, Left),
263
mnesia_lib:create_counter(trans_log_writes_prev),
264
mnesia_lib:create_counter(trans_restarts),
265
mnesia_lib:create_counter(trans_failures),
266
?ets_new_table(mnesia_held_locks, [bag, public, named_table]),
267
?ets_new_table(mnesia_tid_locks, [bag, public, named_table]),
268
?ets_new_table(mnesia_sticky_locks, [set, public, named_table]),
269
?ets_new_table(mnesia_lock_queue,
270
[bag, public, named_table, {keypos, 2}]),
271
?ets_new_table(mnesia_lock_counter, [set, public, named_table]),
272
set(checkpoints, []),
273
set(pending_checkpoints, []),
274
set(pending_checkpoint_pids, []),
276
{ok, #state{supervisor = Parent}};
278
mnesia_lib:report_fatal("Bad configuration: ~p~n", [Reason]),
279
{stop, {bad_config, Reason}}
283
case ?catch_val(use_dir) of
285
case get_env(schema_location) of
287
opt_disc -> non_empty_dir();
294
%% Returns true if the Mnesia directory contains
297
mnesia_lib:exists(mnesia_bup:fallback_bup()) or
298
mnesia_lib:exists(mnesia_lib:tab2dmp(schema)) or
299
mnesia_lib:exists(mnesia_lib:tab2dat(schema)).
301
%%----------------------------------------------------------------------
302
%% Func: handle_call/3
303
%% Returns: {reply, Reply, State} |
304
%% {reply, Reply, State, Timeout} |
305
%% {noreply, State} |
306
%% {noreply, State, Timeout} |
307
%% {stop, Reason, Reply, State} | (terminate/2 is called)
308
%%----------------------------------------------------------------------
310
handle_call({mktab, Tab, Args}, _From, State) ->
311
case catch ?ets_new_table(Tab, Args) of
312
{'EXIT', ExitReason} ->
313
Msg = "Cannot create ets table",
314
Reason = {system_limit, Msg, Tab, Args, ExitReason},
315
fatal("~p~n", [Reason]),
318
{reply, Reply, State}
321
handle_call({unsafe_mktab, Tab, Args}, _From, State) ->
322
case catch ?ets_new_table(Tab, Args) of
323
{'EXIT', ExitReason} ->
324
{reply, {error, ExitReason}, State};
326
{reply, Reply, State}
330
handle_call({open_dets, Tab, Args}, _From, State) ->
331
case mnesia_lib:dets_sync_open(Tab, Args) of
333
{reply, {ok, Tab}, State};
336
Msg = "Cannot open dets table",
337
Error = {error, {Msg, Tab, Args, Reason}},
338
fatal("~p~n", [Error]),
342
handle_call({unsafe_open_dets, Tab, Args}, _From, State) ->
343
case mnesia_lib:dets_sync_open(Tab, Args) of
345
{reply, {ok, Tab}, State};
347
{reply, {error,Reason}, State}
350
handle_call({close_dets, Tab}, _From, State) ->
351
case mnesia_lib:dets_sync_close(Tab) of
355
Msg = "Cannot close dets table",
356
Error = {error, {Msg, Tab, Reason}},
357
fatal("~p~n", [Error]),
361
handle_call({unsafe_close_dets, Tab}, _From, State) ->
362
mnesia_lib:dets_sync_close(Tab),
365
handle_call({open_log, Args}, _From, State) ->
366
Res = disk_log:open([{notify, true}|Args]),
369
handle_call({reopen_log, Name, Fname, Head}, _From, State) ->
370
case disk_log:reopen(Name, Fname, Head) of
375
Msg = "Cannot rename disk_log file",
376
Error = {error, {Msg, Name, Fname, Head, Reason}},
377
fatal("~p~n", [Error]),
381
handle_call({close_log, Name}, _From, State) ->
382
case disk_log:close(Name) of
387
Msg = "Cannot close disk_log file",
388
Error = {error, {Msg, Name, Reason}},
389
fatal("~p~n", [Error]),
393
handle_call({unsafe_close_log, Name}, _From, State) ->
394
disk_log:close(Name),
397
handle_call({negotiate_protocol, Mon, _Version, _Protocols}, _From, State)
398
when State#state.tm_started == false ->
399
State2 = State#state{early_connects = [node(Mon) | State#state.early_connects]},
400
{reply, {node(), {reject, self(), uninitialized, uninitialized}}, State2};
402
handle_call({negotiate_protocol, Mon, Version, Protocols}, From, State)
403
when node(Mon) /= node() ->
404
Protocol = protocol_version(),
405
MyVersion = mnesia:system_info(version),
406
case lists:member(Protocol, Protocols) of
408
accept_protocol(Mon, MyVersion, Protocol, From, State);
410
%% in this release we should be able to handle the previous
412
case hd(Protocols) of
413
?previous_protocol_version ->
414
accept_protocol(Mon, MyVersion, ?previous_protocol_version, From, State);
416
verbose("Connection with ~p rejected. "
417
"version = ~p, protocols = ~p, "
418
"expected version = ~p, expected protocol = ~p~n",
419
[node(Mon), Version, Protocols, MyVersion, Protocol]),
420
{reply, {node(), {reject, self(), MyVersion, Protocol}}, State}
424
handle_call(init, _From, State) ->
425
net_kernel:monitor_nodes(true),
426
EarlyNodes = State#state.early_connects,
427
State2 = State#state{tm_started = true},
428
{reply, EarlyNodes, State2};
430
handle_call(Msg, _From, State) ->
431
error("~p got unexpected call: ~p~n", [?MODULE, Msg]),
434
accept_protocol(Mon, Version, Protocol, From, State) ->
435
Reply = {node(), {accept, self(), Version, Protocol}},
437
Pending0 = State#state.pending_negotiators,
438
Pending = lists:keydelete(Node, 1, Pending0),
439
case lists:member(Node, State#state.going_down) of
441
%% Wait for the mnesia_down to be processed,
443
P = Pending ++ [{Node, Mon, From, Reply}],
444
{noreply, State#state{pending_negotiators = P}};
447
link(Mon), %% link to remote Monitor
448
case Protocol == protocol_version() of
450
set({protocol, Node}, {Protocol, false});
452
set({protocol, Node}, {Protocol, true})
454
{reply, Reply, State#state{pending_negotiators = Pending}}
457
%%----------------------------------------------------------------------
458
%% Func: handle_cast/2
459
%% Returns: {noreply, State} |
460
%% {noreply, State, Timeout} |
461
%% {stop, Reason, State} (terminate/2 is called)
462
%%----------------------------------------------------------------------
464
handle_cast({mnesia_down, mnesia_controller, Node}, State) ->
465
mnesia_tm:mnesia_down(Node),
468
handle_cast({mnesia_down, mnesia_tm, {Node, Pending}}, State) ->
469
mnesia_locker:mnesia_down(Node, Pending),
472
handle_cast({mnesia_down, mnesia_locker, Node}, State) ->
473
Down = {mnesia_down, Node},
474
mnesia_lib:report_system_event(Down),
475
GoingDown = lists:delete(Node, State#state.going_down),
476
State2 = State#state{going_down = GoingDown},
477
Pending = State#state.pending_negotiators,
478
case lists:keysearch(Node, 1, Pending) of
479
{value, {Node, Mon, ReplyTo, Reply}} ->
480
%% Late reply to remote monitor
481
link(Mon), %% link to remote Monitor
482
gen_server:reply(ReplyTo, Reply),
483
P2 = lists:keydelete(Node, 1,Pending),
484
State3 = State2#state{pending_negotiators = P2},
487
%% No pending remote monitors
491
handle_cast({disconnect, Node}, State) ->
492
case rpc:call(Node, erlang, whereis, [?MODULE]) of
495
RemoteMon when pid(RemoteMon) ->
500
handle_cast({inconsistent_database, Context, Node}, State) ->
501
Msg = {inconsistent_database, Context, Node},
502
mnesia_lib:report_system_event(Msg),
505
handle_cast(Msg, State) ->
506
error("~p got unexpected cast: ~p~n", [?MODULE, Msg]),
509
%%----------------------------------------------------------------------
510
%% Func: handle_info/2
511
%% Returns: {noreply, State} |
512
%% {noreply, State, Timeout} |
513
%% {stop, Reason, State} (terminate/2 is called)
514
%%----------------------------------------------------------------------
516
handle_info({'EXIT', Pid, R}, State) when Pid == State#state.supervisor ->
517
dbg_out("~p was ~p by supervisor~n",[?MODULE, R]),
520
handle_info({'EXIT', Pid, fatal}, State) when node(Pid) == node() ->
521
dbg_out("~p got FATAL ERROR from: ~p~n",[?MODULE, Pid]),
522
exit(State#state.supervisor, shutdown),
525
handle_info({'EXIT', Pid, Reason}, State) ->
529
%% Remotly linked process died, assume that it was a mnesia_monitor
530
mnesia_recover:mnesia_down(Node),
531
mnesia_controller:mnesia_down(Node),
532
{noreply, State#state{going_down = [Node | State#state.going_down]}};
534
%% We have probably got an exit signal from from
536
Hint = "Hint: check that the disk still is writable",
537
Msg = {'EXIT', Pid, Reason},
538
fatal("~p got unexpected info: ~p; ~p~n",
539
[?MODULE, Msg, Hint])
542
handle_info({nodeup, Node}, State) ->
543
%% Ok, we are connected to yet another Erlang node
544
%% Let's check if Mnesia is running there in order
545
%% to detect if the network has been partitioned
546
%% due to communication failure.
548
HasDown = mnesia_recover:has_mnesia_down(Node),
549
ImRunning = mnesia_lib:is_running(),
552
%% If I'm not running the test will be made later.
553
HasDown == true, ImRunning == yes ->
554
spawn_link(?MODULE, detect_partitioned_network, [self(), Node]);
560
handle_info({nodedown, _Node}, State) ->
561
%% Ignore, we are only caring about nodeup's
564
handle_info({disk_log, _Node, Log, Info}, State) ->
569
mnesia_lib:important("Warning Log file ~p error reason ~s~n",
570
[Log, disk_log:format_error(Info)])
574
handle_info(Msg, State) ->
575
error("~p got unexpected info (~p): ~p~n", [?MODULE, State, Msg]).
577
%%----------------------------------------------------------------------
579
%% Purpose: Shutdown the server
580
%% Returns: any (ignored by gen_server)
581
%%----------------------------------------------------------------------
582
terminate(Reason, State) ->
583
terminate_proc(?MODULE, Reason, State).
585
%%----------------------------------------------------------------------
586
%% Func: code_change/3
587
%% Purpose: Upgrade process when its code is to be changed
588
%% Returns: {ok, NewState}
589
%%----------------------------------------------------------------------
591
code_change(_OldVsn, State, _Extra) ->
594
%%%----------------------------------------------------------------------
595
%%% Internal functions
596
%%%----------------------------------------------------------------------
598
process_config_args([]) ->
600
process_config_args([C|T]) ->
602
dbg_out("Env ~p: ~p~n", [C, V]),
603
mnesia_lib:set(C, V),
604
process_config_args(T).
607
mnesia_lib:set(E, check_type(E,Val)),
611
case ?catch_val(E) of
613
case application:get_env(mnesia, E) of
617
check_type(E, default_env(E))
630
dump_log_load_regulation,
631
dump_log_time_threshold,
632
dump_log_update_in_place,
633
dump_log_write_threshold,
637
ignore_fallback_at_startup,
638
fallback_error_function,
639
max_wait_for_decision,
644
default_env(access_module) ->
646
default_env(auto_repair) ->
648
default_env(backup_module) ->
650
default_env(debug) ->
653
Name = lists:concat(["Mnesia.", node()]),
654
filename:absname(Name);
655
default_env(dump_log_load_regulation) ->
657
default_env(dump_log_time_threshold) ->
659
default_env(dump_log_update_in_place) ->
661
default_env(dump_log_write_threshold) ->
663
default_env(embedded_mnemosyne) ->
665
default_env(event_module) ->
667
default_env(extra_db_nodes) ->
669
default_env(ignore_fallback_at_startup) ->
671
default_env(fallback_error_function) ->
673
default_env(max_wait_for_decision) ->
675
default_env(schema_location) ->
677
default_env(core_dir) ->
680
check_type(Env, Val) ->
681
case catch do_check_type(Env, Val) of
683
exit({bad_config, Env, Val});
688
do_check_type(access_module, A) when atom(A) -> A;
689
do_check_type(auto_repair, B) -> bool(B);
690
do_check_type(backup_module, B) when atom(B) -> B;
691
do_check_type(debug, debug) -> debug;
692
do_check_type(debug, false) -> none;
693
do_check_type(debug, none) -> none;
694
do_check_type(debug, trace) -> trace;
695
do_check_type(debug, true) -> debug;
696
do_check_type(debug, verbose) -> verbose;
697
do_check_type(dir, V) -> filename:absname(V);
698
do_check_type(dump_log_load_regulation, B) -> bool(B);
699
do_check_type(dump_log_time_threshold, I) when integer(I), I > 0 -> I;
700
do_check_type(dump_log_update_in_place, B) -> bool(B);
701
do_check_type(dump_log_write_threshold, I) when integer(I), I > 0 -> I;
702
do_check_type(event_module, A) when atom(A) -> A;
703
do_check_type(ignore_fallback_at_startup, B) -> bool(B);
704
do_check_type(fallback_error_function, {Mod, Func})
705
when atom(Mod), atom(Func) -> {Mod, Func};
706
do_check_type(embedded_mnemosyne, B) -> bool(B);
707
do_check_type(extra_db_nodes, L) when list(L) ->
708
Fun = fun(N) when N == node() -> false;
709
(A) when atom(A) -> true
711
lists:filter(Fun, L);
712
do_check_type(max_wait_for_decision, infinity) -> infinity;
713
do_check_type(max_wait_for_decision, I) when integer(I), I > 0 -> I;
714
do_check_type(schema_location, M) -> media(M);
715
do_check_type(core_dir, "false") -> false;
716
do_check_type(core_dir, false) -> false;
717
do_check_type(core_dir, Dir) when list(Dir) -> Dir.
721
bool(false) -> false.
724
media(opt_disc) -> opt_disc;
727
patch_env(Env, Val) ->
728
case catch do_check_type(Env, Val) of
730
{error, {bad_type, Env, Val}};
732
application_controller:set_env(mnesia, Env, NewVal),
736
detect_partitioned_network(Mon, Node) ->
737
GoodNodes = negotiate_protocol([Node]),
738
detect_inconcistency(GoodNodes, running_partitioned_network),
742
detect_inconcistency([], _Context) ->
744
detect_inconcistency(Nodes, Context) ->
745
Downs = [N || N <- Nodes, mnesia_recover:has_mnesia_down(N)],
746
{Replies, _BadNodes} =
747
rpc:multicall(Downs, ?MODULE, has_remote_mnesia_down, [node()]),
748
report_inconsistency(Replies, Context, ok).
750
has_remote_mnesia_down(Node) ->
751
HasDown = mnesia_recover:has_mnesia_down(Node),
752
Master = mnesia_recover:get_master_nodes(schema),
754
HasDown == true, Master == [] ->
760
report_inconsistency([{true, Node} | Replies], Context, _Status) ->
761
%% Oops, Mnesia is already running on the
762
%% other node AND we both regard each
763
%% other as down. The database is
764
%% potentially inconsistent and we has to
765
%% do tell the applications about it, so
766
%% they may perform some clever recovery
768
Msg = {inconsistent_database, Context, Node},
769
mnesia_lib:report_system_event(Msg),
770
report_inconsistency(Replies, Context, inconsistent_database);
771
report_inconsistency([{false, _Node} | Replies], Context, Status) ->
772
report_inconsistency(Replies, Context, Status);
773
report_inconsistency([{badrpc, _Reason} | Replies], Context, Status) ->
774
report_inconsistency(Replies, Context, Status);
775
report_inconsistency([], _Context, Status) ->