1
%% ``The contents of this file are subject to the Erlang Public License,
2
%% Version 1.1, (the "License"); you may not use this file except in
3
%% compliance with the License. You should have received a copy of the
4
%% Erlang Public License along with this software. If not, it can be
5
%% retrieved via the world wide web at http://www.erlang.org/.
7
%% Software distributed under the License is distributed on an "AS IS"
8
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
9
%% the License for the specific language governing rights and limitations
12
%% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
13
%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
14
%% AB. All Rights Reserved.''
16
%% $Id: mnesia_controller.erl,v 1.3 2010/03/04 13:54:19 maria Exp $
18
%% The mnesia_init process loads tables from local disc or from
19
%% another nodes. It also coordinates updates of the info about
20
%% where we can read and write tables.
22
%% Tables may need to be loaded initially at startup of the local
23
%% node or when other nodes announces that they already have loaded
24
%% tables that we also want.
26
%% Initially we set the load request queue to those tables that we
27
%% safely can load locally, i.e. tables where we have the last
28
%% consistent replica and we have received mnesia_down from all
29
%% other nodes holding the table. Then we let the mnesia_init
30
%% process enter its normal working state.
32
%% When we need to load a table we append a request to the load
33
%% request queue. All other requests are regarded as high priority
34
%% and are processed immediately (e.g. update table whereabouts).
35
%% We processes the load request queue as a "background" job..
37
-module(mnesia_controller).
39
-behaviour(gen_server).
41
%% Mnesia internal stuff
52
wait_for_schema_commit_lock/0,
53
release_schema_commit_lock/0,
57
sync_and_block_table_whereabouts/4,
58
sync_del_table_copy_whereabouts/2,
63
unannounce_add_table_copy/2,
64
master_nodes_updated/2,
69
change_table_access_mode/1,
74
start_remote_sender/4,
75
schedule_late_disc_load/2
78
%% gen_server callbacks
86
%% Module internal stuff
92
wait_for_tables_init/2
95
-import(mnesia_lib, [set/2, add/2]).
96
-import(mnesia_lib, [fatal/2, error/2, verbose/2, dbg_out/2]).
98
-include("mnesia.hrl").
100
-define(SERVER_NAME, ?MODULE).
102
-record(state, {supervisor,
103
schema_is_merged = false,
109
late_loader_queue = [],
110
dumper_pid, % Dumper or schema commit pid
111
dumper_queue = [], % Dumper or schema commit queue
116
-record(worker_reply, {what,
121
-record(schema_commit_lock, {owner}).
122
-record(block_controller, {owner}).
124
-record(dump_log, {initiated_by,
128
-record(net_load, {table,
134
-record(send_table, {table,
139
-record(disc_load, {table,
144
-record(late_load, {table,
150
-record(loader_done, {worker_pid,
159
-record(sender_done, {worker_pid,
164
-record(dumper_done, {worker_pid,
169
case ?catch_val(Var) of
170
{'EXIT', Reason} -> mnesia_lib:other_val(Var, Reason);
175
gen_server:start_link({local, ?SERVER_NAME}, ?MODULE, [self()],
180
sync_dump_log(InitBy) ->
181
call({sync_dump_log, InitBy}).
183
async_dump_log(InitBy) ->
184
?SERVER_NAME ! {async_dump_log, InitBy}.
186
%% Wait for tables to be active
187
%% If needed, we will wait for Mnesia to start
188
%% If Mnesia stops, we will wait for Mnesia to restart
189
%% We will wait even if the list of tables is empty
191
wait_for_tables(Tabs, Timeout) when list(Tabs), Timeout == infinity ->
192
do_wait_for_tables(Tabs, Timeout);
193
wait_for_tables(Tabs, Timeout) when list(Tabs),
194
integer(Timeout), Timeout >= 0 ->
195
do_wait_for_tables(Tabs, Timeout);
196
wait_for_tables(Tabs, Timeout) ->
197
{error, {badarg, Tabs, Timeout}}.
199
do_wait_for_tables(Tabs, 0) ->
201
do_wait_for_tables(Tabs, Timeout) ->
202
Pid = spawn_link(?MODULE, wait_for_tables_init, [self(), Tabs]),
204
{?SERVER_NAME, Pid, Res} ->
217
case catch mnesia_lib:active_tables() of
219
{error, {node_not_running, node()}};
220
Active when list(Active) ->
221
case Tabs -- Active of
229
wait_for_tables_init(From, Tabs) ->
230
process_flag(trap_exit, true),
231
Res = wait_for_init(From, Tabs, whereis(?SERVER_NAME)),
232
From ! {?SERVER_NAME, self(), Res},
236
wait_for_init(From, Tabs, Init) ->
237
case catch link(Init) of
239
%% Mnesia is not started
240
{error, {node_not_running, node()}};
241
true when pid(Init) ->
242
cast({sync_tabs, Tabs, self()}),
243
rec_tabs(Tabs, Tabs, From, Init)
246
sync_reply(Waiter, Tab) ->
247
Waiter ! {?SERVER_NAME, {tab_synced, Tab}}.
249
rec_tabs([Tab | Tabs], AllTabs, From, Init) ->
251
{?SERVER_NAME, {tab_synced, Tab}} ->
252
rec_tabs(Tabs, AllTabs, From, Init);
255
%% This will trigger an exit signal
257
exit(wait_for_tables_timeout);
260
%% Oops, mnesia_init stopped,
263
rec_tabs([], _, _, Init) ->
271
case cast({mnesia_down, Node}) of
272
{error, _} -> mnesia_monitor:mnesia_down(?SERVER_NAME, Node);
275
wait_for_schema_commit_lock() ->
276
link(whereis(?SERVER_NAME)),
277
unsafe_call(wait_for_schema_commit_lock).
279
block_controller() ->
280
call(block_controller).
282
unblock_controller() ->
283
cast(unblock_controller).
285
release_schema_commit_lock() ->
286
cast({release_schema_commit_lock, self()}),
287
unlink(whereis(?SERVER_NAME)).
289
%% Special for preparation of add table copy
290
get_network_copy(Tab, Cs) ->
291
Work = #net_load{table = Tab,
292
reason = {dumper, add_table_copy},
295
Res = (catch load_table(Work)),
296
if Res#loader_done.is_loaded == true ->
297
Tab = Res#loader_done.table_name,
298
case Res#loader_done.needs_announce of
307
receive %% Flush copier done message
308
{copier_done, _Node} ->
310
after 500 -> %% avoid hanging if something is wrong and we shall fail.
313
Res#loader_done.reply.
315
%% This functions is invoked from the dumper
317
%% There are two cases here:
319
%% no need for sync, since mnesia_controller not started yet
321
%% already synced with mnesia_controller since the dumper
322
%% is syncronously started from mnesia_controller
325
{loaded, ok} = mnesia_loader:disc_load_table(Tab, {dumper,create_table}).
327
get_disc_copy(Tab) ->
328
disc_load_table(Tab, {dumper,change_table_copy_type}, undefined).
330
%% Returns ok instead of yes
331
force_load_table(Tab) when atom(Tab), Tab /= schema ->
332
case ?catch_val({Tab, storage_type}) of
334
do_force_load_table(Tab);
336
do_force_load_table(Tab);
338
do_force_load_table(Tab);
340
set({Tab, load_by_force}, true),
341
cast({force_load_updated, Tab}),
342
wait_for_tables([Tab], infinity);
344
{error, {no_exists, Tab}}
346
force_load_table(Tab) ->
347
{error, {bad_type, Tab}}.
349
do_force_load_table(Tab) ->
350
Loaded = ?catch_val({Tab, load_reason}),
353
set({Tab, load_by_force}, true),
354
mnesia_late_loader:async_late_disc_load(node(), [Tab], forced_by_user),
355
wait_for_tables([Tab], infinity);
357
set({Tab, load_by_force}, true),
358
mnesia_late_loader:async_late_disc_load(node(), [Tab], forced_by_user),
359
wait_for_tables([Tab], infinity);
363
master_nodes_updated(schema, _Masters) ->
365
master_nodes_updated(Tab, Masters) ->
366
cast({master_nodes_updated, Tab, Masters}).
368
schedule_late_disc_load(Tabs, Reason) ->
369
MsgTag = late_disc_load,
370
try_schedule_late_disc_load(Tabs, Reason, MsgTag).
372
try_schedule_late_disc_load(Tabs, _Reason, MsgTag)
373
when Tabs == [], MsgTag /= schema_is_merged ->
375
try_schedule_late_disc_load(Tabs, Reason, MsgTag) ->
378
Item = mnesia_late_disc_load,
379
Nodes = val({current, db_nodes}),
380
mnesia:lock({global, Item, Nodes}, write),
381
case multicall(Nodes -- [node()], disc_load_intents) of
383
call({MsgTag, Tabs, Reason, Replies}),
386
%% Some nodes did not respond, lets try again
390
case mnesia:transaction(GetIntents) of
393
{'atomic', {retry, BadNodes}} ->
394
verbose("Retry late_load_tables because bad nodes: ~p~n",
396
try_schedule_late_disc_load(Tabs, Reason, MsgTag);
397
{aborted, AbortReason} ->
398
fatal("Cannot late_load_tables~p: ~p~n",
399
[[Tabs, Reason, MsgTag], AbortReason])
403
case mnesia:system_info(is_running) of
405
{error, {node_not_running, node()}};
407
{NewC, OldC} = mnesia_recover:connect_nodes(Ns),
408
Connected = NewC ++OldC,
409
New1 = mnesia_lib:intersect(Ns, Connected),
410
New = New1 -- val({current, db_nodes}),
412
case try_merge_schema(New) of
414
mnesia_lib:add_list(extra_db_nodes, New),
416
{aborted, {throw, Str}} when list(Str) ->
417
%%mnesia_recover:disconnect_nodes(New),
418
{error, {merge_schema_failed, lists:flatten(Str)}};
420
%% Unconnect nodes where merge failed!!
421
%% mnesia_recover:disconnect_nodes(New),
426
%% Merge the local schema with the schema on other nodes.
427
%% But first we must let all processes that want to force
428
%% load tables wait until the schema merge is done.
431
AllNodes = mnesia_lib:all_nodes(),
432
case try_merge_schema(AllNodes) of
435
{aborted, {throw, Str}} when list(Str) ->
436
fatal("Failed to merge schema: ~s~n", [Str]);
438
fatal("Failed to merge schema: ~p~n", [Else])
441
try_merge_schema(Nodes) ->
442
case mnesia_schema:merge_schema() of
443
{'atomic', not_merged} ->
444
%% No more nodes that we need to merge the schema with
446
{'atomic', {merged, OldFriends, NewFriends}} ->
447
%% Check if new nodes has been added to the schema
448
Diff = mnesia_lib:all_nodes() -- [node() | Nodes],
449
mnesia_recover:connect_nodes(Diff),
451
%% Tell everybody to adopt orphan tables
452
im_running(OldFriends, NewFriends),
453
im_running(NewFriends, OldFriends),
455
try_merge_schema(Nodes);
456
{'atomic', {"Cannot get cstructs", Node, Reason}} ->
457
dbg_out("Cannot get cstructs, Node ~p ~p~n", [Node, Reason]),
458
timer:sleep(1000), % Avoid a endless loop look alike
459
try_merge_schema(Nodes);
464
im_running(OldFriends, NewFriends) ->
465
abcast(OldFriends, {im_running, node(), NewFriends}).
467
schema_is_merged() ->
468
MsgTag = schema_is_merged,
469
SafeLoads = initial_safe_loads(),
471
%% At this point we do not know anything about
472
%% which tables that the other nodes already
473
%% has loaded and therefore we let the normal
474
%% processing of the loader_queue take care
475
%% of it, since we at that time point will
476
%% know the whereabouts. We rely on the fact
477
%% that all nodes tells each other directly
478
%% when they have loaded a table and are
479
%% willing to share it.
481
try_schedule_late_disc_load(SafeLoads, initial, MsgTag).
485
case whereis(?SERVER_NAME) of
486
undefined ->{error, {node_not_running, node()}};
487
Pid -> gen_server:cast(Pid, Msg)
490
abcast(Nodes, Msg) ->
491
gen_server:abcast(Nodes, ?SERVER_NAME, Msg).
494
case whereis(?SERVER_NAME) of
495
undefined -> {error, {node_not_running, node()}};
496
Pid -> gen_server:call(Pid, Msg, infinity)
500
case whereis(?SERVER_NAME) of
502
{error, {node_not_running, node()}};
505
Res = gen_server:call(Pid, Msg, infinity),
508
%% We get an exit signal if server dies
510
{'EXIT', Pid, _Reason} ->
511
{error, {node_not_running, node()}}
518
remote_call(Node, Func, Args) ->
519
case catch gen_server:call({?MODULE, Node}, {Func, Args, self()}, infinity) of
526
multicall(Nodes, Msg) ->
527
{Good, Bad} = gen_server:multi_call(Nodes, ?MODULE, Msg, infinity),
528
PatchedGood = [Reply || {_Node, Reply} <- Good],
529
{PatchedGood, Bad}. %% Make the replies look like rpc:multicalls..
530
%% rpc:multicall(Nodes, ?MODULE, call, [Msg]).
532
%%%----------------------------------------------------------------------
533
%%% Callback functions from gen_server
534
%%%----------------------------------------------------------------------
536
%%----------------------------------------------------------------------
538
%% Returns: {ok, State} |
539
%% {ok, State, Timeout} |
541
%%----------------------------------------------------------------------
543
process_flag(trap_exit, true),
544
mnesia_lib:verbose("~p starting: ~p~n", [?SERVER_NAME, self()]),
546
%% Handshake and initialize transaction recovery
547
%% for new nodes detected in the schema
548
All = mnesia_lib:all_nodes(),
549
Diff = All -- [node() | val(original_nodes)],
550
mnesia_lib:unset(original_nodes),
551
mnesia_recover:connect_nodes(Diff),
553
Interval = mnesia_monitor:get_env(dump_log_time_threshold),
554
Msg = {async_dump_log, time_threshold},
555
{ok, Ref} = timer:send_interval(Interval, Msg),
556
mnesia_dumper:start_regulator(),
558
{ok, #state{supervisor = Parent, dump_log_timer_ref = Ref}}.
560
%%----------------------------------------------------------------------
561
%% Func: handle_call/3
562
%% Returns: {reply, Reply, State} |
563
%% {reply, Reply, State, Timeout} |
564
%% {noreply, State} |
565
%% {noreply, State, Timeout} |
566
%% {stop, Reason, Reply, State} | (terminate/2 is called)
567
%% {stop, Reason, Reply, State} (terminate/2 is called)
568
%%----------------------------------------------------------------------
570
handle_call({sync_dump_log, InitBy}, From, State) ->
571
Worker = #dump_log{initiated_by = InitBy,
574
State2 = add_worker(Worker, State),
577
handle_call(wait_for_schema_commit_lock, From, State) ->
578
Worker = #schema_commit_lock{owner = From},
579
State2 = add_worker(Worker, State),
582
handle_call(block_controller, From, State) ->
583
Worker = #block_controller{owner = From},
584
State2 = add_worker(Worker, State),
588
handle_call(get_cstructs, From, State) ->
589
Tabs = val({schema, tables}),
590
Cstructs = [val({T, cstruct}) || T <- Tabs],
591
Running = val({current, db_nodes}),
592
reply(From, {cstructs, Cstructs, Running}),
595
handle_call({schema_is_merged, TabsR, Reason, RemoteLoaders}, From, State) ->
596
State2 = late_disc_load(TabsR, Reason, RemoteLoaders, From, State),
598
%% Handle early messages
599
Msgs = State2#state.early_msgs,
600
State3 = State2#state{early_msgs = [], schema_is_merged = true},
601
Ns = val({current, db_nodes}),
602
dbg_out("Schema is merged ~w, State ~w~n", [Ns, State3]),
603
%% dbg_out("handle_early_msgs ~p ~n", [Msgs]), % qqqq
604
handle_early_msgs(lists:reverse(Msgs), State3);
606
handle_call(disc_load_intents, From, State) ->
607
Tabs = disc_load_intents(State#state.loader_queue) ++
608
disc_load_intents(State#state.late_loader_queue),
609
ActiveTabs = mnesia_lib:local_active_tables(),
610
reply(From, {ok, node(), mnesia_lib:union(Tabs, ActiveTabs)}),
613
handle_call({update_where_to_write, [add, Tab, AddNode], _From}, _Dummy, State) ->
614
%%% dbg_out("update_w2w ~p", [[add, Tab, AddNode]]), %%% qqqq
615
Current = val({current, db_nodes}),
617
case lists:member(AddNode, Current) and
618
State#state.schema_is_merged == true of
620
mnesia_lib:add({Tab, where_to_write}, AddNode);
626
handle_call({add_active_replica, [Tab, ToNode, RemoteS, AccessMode], From},
628
KnownNode = lists:member(ToNode, val({current, db_nodes})),
629
Merged = State#state.schema_is_merged,
631
KnownNode == false ->
632
reply(ReplyTo, ignore),
635
Res = add_active_replica(Tab, ToNode, RemoteS, AccessMode),
638
true -> %% Schema is not merged
639
Msg = {add_active_replica, [Tab, ToNode, RemoteS, AccessMode], From},
640
Msgs = State#state.early_msgs,
641
reply(ReplyTo, ignore), %% Reply ignore and add data after schema merge
642
noreply(State#state{early_msgs = [{call, Msg, undefined} | Msgs]})
645
handle_call({unannounce_add_table_copy, [Tab, Node], From}, ReplyTo, State) ->
646
KnownNode = lists:member(node(From), val({current, db_nodes})),
647
Merged = State#state.schema_is_merged,
649
KnownNode == false ->
650
reply(ReplyTo, ignore),
653
Res = unannounce_add_table_copy(Tab, Node),
656
true -> %% Schema is not merged
657
Msg = {unannounce_add_table_copy, [Tab, Node], From},
658
Msgs = State#state.early_msgs,
659
reply(ReplyTo, ignore), %% Reply ignore and add data after schema merge
660
%% Set ReplyTO to undefined so we don't reply twice
661
noreply(State#state{early_msgs = [{call, Msg, undefined} | Msgs]})
664
handle_call(Msg, From, State) when State#state.schema_is_merged == false ->
665
%% Buffer early messages
666
%% dbg_out("Buffered early msg ~p ~n", [Msg]), %% qqqq
667
Msgs = State#state.early_msgs,
668
noreply(State#state{early_msgs = [{call, Msg, From} | Msgs]});
670
handle_call({net_load, Tab, Cs}, From, State) ->
671
Worker = #net_load{table = Tab,
673
reason = add_table_copy,
676
State2 = add_worker(Worker, State),
679
handle_call({late_disc_load, Tabs, Reason, RemoteLoaders}, From, State) ->
680
State2 = late_disc_load(Tabs, Reason, RemoteLoaders, From, State),
683
handle_call({block_table, [Tab], From}, _Dummy, State) ->
684
case lists:member(node(From), val({current, db_nodes})) of
692
handle_call({check_w2r, _Node, Tab}, _From, State) ->
693
{reply, val({Tab, where_to_read}), State};
695
handle_call(Msg, _From, State) ->
696
error("~p got unexpected call: ~p~n", [?SERVER_NAME, Msg]),
699
disc_load_intents([H | T]) when record(H, disc_load) ->
700
[H#disc_load.table | disc_load_intents(T)];
701
disc_load_intents([H | T]) when record(H, late_load) ->
702
[H#late_load.table | disc_load_intents(T)];
703
disc_load_intents( [H | T]) when record(H, net_load) ->
704
disc_load_intents(T);
705
disc_load_intents([]) ->
708
late_disc_load(TabsR, Reason, RemoteLoaders, From, State) ->
709
verbose("Intend to load tables: ~p~n", [TabsR]),
710
?eval_debug_fun({?MODULE, late_disc_load},
713
{loaders, RemoteLoaders}]),
716
%% RemoteLoaders is a list of {ok, Node, Tabs} tuples
718
%% Remove deleted tabs
719
LocalTabs = mnesia_lib:val({schema, local_tables}),
720
Filter = fun({Tab, Reas}, Acc) ->
721
case lists:member(Tab, LocalTabs) of
722
true -> [{Tab, Reas} | Acc];
726
case lists:member(Tab, LocalTabs) of
732
Tabs = lists:foldl(Filter, [], TabsR),
734
Nodes = val({current, db_nodes}),
735
LateLoaders = late_loaders(Tabs, Reason, RemoteLoaders, Nodes),
736
LateQueue = State#state.late_loader_queue ++ LateLoaders,
737
State#state{late_loader_queue = LateQueue}.
739
late_loaders([{Tab, Reason} | Tabs], DefaultReason, RemoteLoaders, Nodes) ->
740
LoadNodes = late_load_filter(RemoteLoaders, Tab, Nodes, []),
743
cast({disc_load, Tab, Reason}); % Ugly cast
747
LateLoad = #late_load{table = Tab, loaders = LoadNodes, reason = Reason},
748
[LateLoad | late_loaders(Tabs, DefaultReason, RemoteLoaders, Nodes)];
750
late_loaders([Tab | Tabs], Reason, RemoteLoaders, Nodes) ->
751
Loaders = late_load_filter(RemoteLoaders, Tab, Nodes, []),
754
cast({disc_load, Tab, Reason}); % Ugly cast
758
LateLoad = #late_load{table = Tab, loaders = Loaders, reason = Reason},
759
[LateLoad | late_loaders(Tabs, Reason, RemoteLoaders, Nodes)];
760
late_loaders([], _Reason, _RemoteLoaders, _Nodes) ->
763
late_load_filter([{error, _} | RemoteLoaders], Tab, Nodes, Acc) ->
764
late_load_filter(RemoteLoaders, Tab, Nodes, Acc);
765
late_load_filter([{badrpc, _} | RemoteLoaders], Tab, Nodes, Acc) ->
766
late_load_filter(RemoteLoaders, Tab, Nodes, Acc);
767
late_load_filter([RL | RemoteLoaders], Tab, Nodes, Acc) ->
768
{ok, Node, Intents} = RL,
769
Access = val({Tab, access_mode}),
770
LocalC = val({Tab, local_content}),
771
StillActive = lists:member(Node, Nodes),
772
RemoteIntent = lists:member(Tab, Intents),
774
Access == read_write,
777
RemoteIntent == true ->
778
Masters = mnesia_recover:get_master_nodes(Tab),
779
case lists:member(Node, Masters) of
781
%% The other node is master node for
782
%% the table, accept his load intent
783
late_load_filter(RemoteLoaders, Tab, Nodes, [Node | Acc]);
784
false when Masters == [] ->
785
%% The table has no master nodes
786
%% accept his load intent
787
late_load_filter(RemoteLoaders, Tab, Nodes, [Node | Acc]);
789
%% Some one else is master node for
790
%% the table, ignore his load intent
791
late_load_filter(RemoteLoaders, Tab, Nodes, Acc)
794
late_load_filter(RemoteLoaders, Tab, Nodes, Acc)
796
late_load_filter([], _Tab, _Nodes, Acc) ->
799
%%----------------------------------------------------------------------
800
%% Func: handle_cast/2
801
%% Returns: {noreply, State} |
802
%% {noreply, State, Timeout} |
803
%% {stop, Reason, State} (terminate/2 is called)
804
%%----------------------------------------------------------------------
806
handle_cast({release_schema_commit_lock, _Owner}, State) ->
808
State#state.is_stopping == true ->
809
{stop, shutdown, State};
811
case State#state.dumper_queue of
812
[#schema_commit_lock{}|Rest] ->
813
[_Worker | Rest] = State#state.dumper_queue,
814
State2 = State#state{dumper_pid = undefined,
815
dumper_queue = Rest},
816
State3 = opt_start_worker(State2),
823
handle_cast(unblock_controller, State) ->
825
State#state.is_stopping == true ->
826
{stop, shutdown, State};
827
record(hd(State#state.dumper_queue), block_controller) ->
828
[_Worker | Rest] = State#state.dumper_queue,
829
State2 = State#state{dumper_pid = undefined,
830
dumper_queue = Rest},
831
State3 = opt_start_worker(State2),
835
handle_cast({mnesia_down, Node}, State) ->
836
maybe_log_mnesia_down(Node),
837
mnesia_lib:del({current, db_nodes}, Node),
838
mnesia_checkpoint:tm_mnesia_down(Node),
839
Alltabs = val({schema, tables}),
840
State2 = reconfigure_tables(Node, State, Alltabs),
841
case State#state.sender_pid of
843
Pid when pid(Pid) -> Pid ! {copier_done, Node}
845
case State#state.loader_pid of
847
Pid2 when pid(Pid2) -> Pid2 ! {copier_done, Node}
850
case State#state.sender_queue of
851
[OldSender | RestSenders] ->
853
node(ST#send_table.receiver_pid) /= Node
855
NewS = lists:filter(Remove, RestSenders),
856
%% Keep old sender it will be removed by sender_done
861
Early = remove_early_messages(State2#state.early_msgs, Node),
862
mnesia_monitor:mnesia_down(?SERVER_NAME, Node),
863
noreply(State2#state{sender_queue = NewSenders, early_msgs = Early});
865
handle_cast({im_running, _Node, NewFriends}, State) ->
866
Tabs = mnesia_lib:local_active_tables() -- [schema],
867
Ns = mnesia_lib:intersect(NewFriends, val({current, db_nodes})),
868
abcast(Ns, {adopt_orphans, node(), Tabs}),
871
handle_cast(Msg, State) when State#state.schema_is_merged == false ->
872
%% Buffer early messages
873
Msgs = State#state.early_msgs,
874
noreply(State#state{early_msgs = [{cast, Msg} | Msgs]});
876
handle_cast({disc_load, Tab, Reason}, State) ->
877
Worker = #disc_load{table = Tab, reason = Reason},
878
State2 = add_worker(Worker, State),
881
handle_cast(Worker, State) when record(Worker, send_table) ->
882
State2 = add_worker(Worker, State),
885
handle_cast({sync_tabs, Tabs, From}, State) ->
886
%% user initiated wait_for_tables
887
handle_sync_tabs(Tabs, From),
890
handle_cast({i_have_tab, Tab, Node}, State) ->
891
case lists:member(Node, val({current, db_nodes})) of
893
State2 = node_has_tabs([Tab], Node, State),
899
handle_cast({force_load_updated, Tab}, State) ->
900
case val({Tab, active_replicas}) of
905
State2 = node_has_tabs([Tab], SomeNode, State),
909
handle_cast({master_nodes_updated, Tab, Masters}, State) ->
910
Active = val({Tab, active_replicas}),
912
case val({Tab, load_by_force}) of
920
mnesia_lib:intersect(Masters, Active)
928
State2 = node_has_tabs([Tab], SomeNode, State),
932
handle_cast({adopt_orphans, Node, Tabs}, State) ->
934
State2 = node_has_tabs(Tabs, Node, State),
936
%% Register the other node as up and running
937
mnesia_recover:log_mnesia_up(Node),
938
verbose("Logging mnesia_up ~w~n", [Node]),
939
mnesia_lib:report_system_event({mnesia_up, Node}),
941
%% Load orphan tables
942
LocalTabs = val({schema, local_tables}) -- [schema],
943
Nodes = val({current, db_nodes}),
944
{LocalOrphans, RemoteMasters} =
945
orphan_tables(LocalTabs, Node, Nodes, [], []),
946
Reason = {adopt_orphan, node()},
947
mnesia_late_loader:async_late_disc_load(node(), LocalOrphans, Reason),
952
[Tab || {Tab, Ns} <- RemoteMasters,
953
lists:member(N, Ns)],
954
mnesia_late_loader:maybe_async_late_disc_load(N, RemoteOrphans, Reason)
956
lists:foreach(Fun, Nodes),
958
Queue = State2#state.loader_queue,
959
State3 = State2#state{loader_queue = Queue},
962
handle_cast(Msg, State) ->
963
error("~p got unexpected cast: ~p~n", [?SERVER_NAME, Msg]),
966
handle_sync_tabs([Tab | Tabs], From) ->
967
case val({Tab, where_to_read}) of
969
case get({sync_tab, Tab}) of
971
put({sync_tab, Tab}, [From]);
973
put({sync_tab, Tab}, [From | Pids])
976
sync_reply(From, Tab)
978
handle_sync_tabs(Tabs, From);
979
handle_sync_tabs([], _From) ->
982
%%----------------------------------------------------------------------
983
%% Func: handle_info/2
984
%% Returns: {noreply, State} |
985
%% {noreply, State, Timeout} |
986
%% {stop, Reason, State} (terminate/2 is called)
987
%%----------------------------------------------------------------------
989
handle_info({async_dump_log, InitBy}, State) ->
990
Worker = #dump_log{initiated_by = InitBy},
991
State2 = add_worker(Worker, State),
994
handle_info(Done, State) when record(Done, dumper_done) ->
995
Pid = Done#dumper_done.worker_pid,
996
Res = Done#dumper_done.worker_res,
998
State#state.is_stopping == true ->
999
{stop, shutdown, State};
1000
Res == dumped, Pid == State#state.dumper_pid ->
1001
[Worker | Rest] = State#state.dumper_queue,
1002
reply(Worker#dump_log.opt_reply_to, Res),
1003
State2 = State#state{dumper_pid = undefined,
1004
dumper_queue = Rest},
1005
State3 = opt_start_worker(State2),
1008
fatal("Dumper failed: ~p~n state: ~p~n", [Res, State]),
1009
{stop, fatal, State}
1012
handle_info(Done, State) when record(Done, loader_done) ->
1015
Done#loader_done.worker_pid == State#state.loader_pid -> ok
1018
[_Worker | Rest] = LoadQ0 = State#state.loader_queue,
1019
LateQueue0 = State#state.late_loader_queue,
1020
{LoadQ, LateQueue} =
1021
case Done#loader_done.is_loaded of
1023
Tab = Done#loader_done.table_name,
1025
%% Optional user sync
1026
case Done#loader_done.needs_sync of
1027
true -> user_sync_tab(Tab);
1031
%% Optional table announcement
1032
case Done#loader_done.needs_announce of
1039
%% Local node needs to perform user_sync_tab/1
1040
Ns = val({current, db_nodes}),
1041
abcast(Ns, {i_have_tab, Tab, node()})
1048
%% Local node needs to perform user_sync_tab/1
1049
Ns = val({current, db_nodes}),
1050
AlreadyKnows = val({Tab, active_replicas}),
1051
abcast(Ns -- AlreadyKnows, {i_have_tab, Tab, node()})
1055
%% Optional client reply
1056
case Done#loader_done.needs_reply of
1058
reply(Done#loader_done.reply_to,
1059
Done#loader_done.reply);
1063
{Rest, reply_late_load(Tab, LateQueue0)};
1065
case Done#loader_done.reply of
1067
{LoadQ0, LateQueue0};
1073
State2 = State#state{loader_pid = undefined,
1074
loader_queue = LoadQ,
1075
late_loader_queue = LateQueue},
1077
State3 = opt_start_worker(State2),
1080
handle_info(Done, State) when record(Done, sender_done) ->
1081
Pid = Done#sender_done.worker_pid,
1082
Res = Done#sender_done.worker_res,
1084
Res == ok, Pid == State#state.sender_pid ->
1085
[Worker | Rest] = State#state.sender_queue,
1086
Worker#send_table.receiver_pid ! {copier_done, node()},
1087
State2 = State#state{sender_pid = undefined,
1088
sender_queue = Rest},
1089
State3 = opt_start_worker(State2),
1092
%% No need to send any message to the table receiver
1093
%% since it will soon get a mnesia_down anyway
1094
fatal("Sender failed: ~p~n state: ~p~n", [Res, State]),
1095
{stop, fatal, State}
1098
handle_info({'EXIT', Pid, R}, State) when Pid == State#state.supervisor ->
1099
catch set(mnesia_status, stopping),
1100
case State#state.dumper_pid of
1102
dbg_out("~p was ~p~n", [?SERVER_NAME, R]),
1103
{stop, shutdown, State};
1105
noreply(State#state{is_stopping = true})
1108
handle_info({'EXIT', Pid, R}, State) when Pid == State#state.dumper_pid ->
1109
case State#state.dumper_queue of
1110
[#schema_commit_lock{}|Workers] -> %% Schema trans crashed or was killed
1111
State2 = State#state{dumper_queue = Workers, dumper_pid = undefined},
1112
State3 = opt_start_worker(State2),
1115
fatal("Dumper or schema commit crashed: ~p~n state: ~p~n", [R, State]),
1116
{stop, fatal, State}
1119
handle_info({'EXIT', Pid, R}, State) when Pid == State#state.loader_pid ->
1120
fatal("Loader crashed: ~p~n state: ~p~n", [R, State]),
1121
{stop, fatal, State};
1123
handle_info({'EXIT', Pid, R}, State) when Pid == State#state.sender_pid ->
1124
%% No need to send any message to the table receiver
1125
%% since it will soon get a mnesia_down anyway
1126
fatal("Sender crashed: ~p~n state: ~p~n", [R, State]),
1127
{stop, fatal, State};
1129
handle_info({From, get_state}, State) ->
1130
From ! {?SERVER_NAME, State},
1133
%% No real need for buffering
1134
handle_info(Msg, State) when State#state.schema_is_merged == false ->
1135
%% Buffer early messages
1136
Msgs = State#state.early_msgs,
1137
noreply(State#state{early_msgs = [{info, Msg} | Msgs]});
1139
handle_info({'EXIT', Pid, wait_for_tables_timeout}, State) ->
1140
sync_tab_timeout(Pid, get()),
1143
handle_info(Msg, State) ->
1144
error("~p got unexpected info: ~p~n", [?SERVER_NAME, Msg]),
1147
reply_late_load(Tab, [H | T]) when H#late_load.table == Tab ->
1148
reply(H#late_load.opt_reply_to, ok),
1149
reply_late_load(Tab, T);
1150
reply_late_load(Tab, [H | T]) ->
1151
[H | reply_late_load(Tab, T)];
1152
reply_late_load(_Tab, []) ->
1155
sync_tab_timeout(Pid, [{{sync_tab, Tab}, Pids} | Tail]) ->
1156
case lists:delete(Pid, Pids) of
1158
erase({sync_tab, Tab});
1160
put({sync_tab, Tab}, Pids2)
1162
sync_tab_timeout(Pid, Tail);
1163
sync_tab_timeout(Pid, [_ | Tail]) ->
1164
sync_tab_timeout(Pid, Tail);
1165
sync_tab_timeout(_Pid, []) ->
1168
%% Pick the load record that has the highest load order
1169
%% Returns {BestLoad, RemainingQueue} or {none, []} if queue is empty
1171
pick_next(Queue, none, none, []).
1173
pick_next([Head | Tail], Load, Order, Rest) when record(Head, net_load) ->
1174
Tab = Head#net_load.table,
1175
select_best(Head, Tail, val({Tab, load_order}), Load, Order, Rest);
1176
pick_next([Head | Tail], Load, Order, Rest) when record(Head, disc_load) ->
1177
Tab = Head#disc_load.table,
1178
select_best(Head, Tail, val({Tab, load_order}), Load, Order, Rest);
1179
pick_next([], Load, _Order, Rest) ->
1182
select_best(Load, Tail, Order, none, none, Rest) ->
1183
pick_next(Tail, Load, Order, Rest);
1184
select_best(Load, Tail, Order, OldLoad, OldOrder, Rest) when Order > OldOrder ->
1185
pick_next(Tail, Load, Order, [OldLoad | Rest]);
1186
select_best(Load, Tail, _Order, OldLoad, OldOrder, Rest) ->
1187
pick_next(Tail, OldLoad, OldOrder, [Load | Rest]).
1189
%%----------------------------------------------------------------------
1190
%% Func: terminate/2
1191
%% Purpose: Shutdown the server
1192
%% Returns: any (ignored by gen_server)
1193
%%----------------------------------------------------------------------
1194
terminate(Reason, State) ->
1195
mnesia_monitor:terminate_proc(?SERVER_NAME, Reason, State).
1197
%%----------------------------------------------------------------------
1198
%% Func: code_change/3
1199
%% Purpose: Upgrade process when its code is to be changed
1200
%% Returns: {ok, NewState}
1201
%%----------------------------------------------------------------------
1202
code_change(_OldVsn, State, _Extra) ->
1205
%%%----------------------------------------------------------------------
1206
%%% Internal functions
1207
%%%----------------------------------------------------------------------
1209
maybe_log_mnesia_down(N) ->
1210
%% We use mnesia_down when deciding which tables to load locally,
1211
%% so if we are not running (i.e haven't decided which tables
1212
%% to load locally), don't log mnesia_down yet.
1213
case mnesia_lib:is_running() of
1215
verbose("Logging mnesia_down ~w~n", [N]),
1216
mnesia_recover:log_mnesia_down(N),
1219
Filter = fun(Tab) ->
1220
inactive_copy_holders(Tab, N)
1222
HalfLoadedTabs = lists:any(Filter, val({schema, local_tables}) -- [schema]),
1224
HalfLoadedTabs == true ->
1225
verbose("Logging mnesia_down ~w~n", [N]),
1226
mnesia_recover:log_mnesia_down(N),
1229
%% Unfortunately we have not loaded some common
1230
%% tables yet, so we cannot rely on the nodedown
1231
log_later %% BUGBUG handle this case!!!
1235
inactive_copy_holders(Tab, Node) ->
1236
Cs = val({Tab, cstruct}),
1237
case mnesia_lib:cs_to_storage_type(Node, Cs) of
1241
mnesia_lib:not_active_here(Tab)
1244
orphan_tables([Tab | Tabs], Node, Ns, Local, Remote) ->
1245
Cs = val({Tab, cstruct}),
1246
CopyHolders = mnesia_lib:copy_holders(Cs),
1247
RamCopyHolders = Cs#cstruct.ram_copies,
1248
DiscCopyHolders = CopyHolders -- RamCopyHolders,
1249
DiscNodes = val({schema, disc_copies}),
1250
LocalContent = Cs#cstruct.local_content,
1251
RamCopyHoldersOnDiscNodes = mnesia_lib:intersect(RamCopyHolders, DiscNodes),
1252
Active = val({Tab, active_replicas}),
1253
case lists:member(Node, DiscCopyHolders) of
1254
true when Active == [] ->
1255
case DiscCopyHolders -- Ns of
1257
%% We're last up and the other nodes have not
1258
%% loaded the table. Lets load it if we are
1259
%% the smallest node.
1260
case lists:min(DiscCopyHolders) of
1261
Min when Min == node() ->
1262
case mnesia_recover:get_master_nodes(Tab) of
1265
orphan_tables(Tabs, Node, Ns, L, Remote);
1267
R = [{Tab, Masters} | Remote],
1268
orphan_tables(Tabs, Node, Ns, Local, R)
1271
orphan_tables(Tabs, Node, Ns, Local, Remote)
1274
orphan_tables(Tabs, Node, Ns, Local, Remote)
1276
false when Active == [], DiscCopyHolders == [], RamCopyHoldersOnDiscNodes == [] ->
1277
%% Special case when all replicas resides on disc less nodes
1278
orphan_tables(Tabs, Node, Ns, [Tab | Local], Remote);
1279
_ when LocalContent == true ->
1280
orphan_tables(Tabs, Node, Ns, [Tab | Local], Remote);
1282
orphan_tables(Tabs, Node, Ns, Local, Remote)
1284
orphan_tables([], _, _, LocalOrphans, RemoteMasters) ->
1285
{LocalOrphans, RemoteMasters}.
1287
node_has_tabs([Tab | Tabs], Node, State) when Node /= node() ->
1288
State2 = update_whereabouts(Tab, Node, State),
1289
node_has_tabs(Tabs, Node, State2);
1290
node_has_tabs([Tab | Tabs], Node, State) ->
1292
node_has_tabs(Tabs, Node, State);
1293
node_has_tabs([], _Node, State) ->
1296
update_whereabouts(Tab, Node, State) ->
1297
Storage = val({Tab, storage_type}),
1298
Read = val({Tab, where_to_read}),
1299
LocalC = val({Tab, local_content}),
1300
BeingCreated = (?catch_val({Tab, create_table}) == true),
1301
Masters = mnesia_recover:get_master_nodes(Tab),
1302
ByForce = val({Tab, load_by_force}),
1310
lists:member(Node, Masters)
1313
dbg_out("Table ~w is loaded on ~w. s=~w, r=~w, lc=~w, f=~w, m=~w~n",
1314
[Tab, Node, Storage, Read, LocalC, ByForce, GoGetIt]),
1317
%% Local contents, don't care about other node
1319
Storage == unknown, Read == nowhere ->
1320
%% No own copy, time to read remotely
1321
%% if the other node is a good node
1322
add_active_replica(Tab, Node),
1325
set({Tab, where_to_read}, Node),
1331
Storage == unknown ->
1332
%% No own copy, continue to read remotely
1333
add_active_replica(Tab, Node),
1334
NodeST = mnesia_lib:storage_type_at_node(Node, Tab),
1335
ReadST = mnesia_lib:storage_type_at_node(Read, Tab),
1336
if %% Avoid reading from disc_only_copies
1337
NodeST == disc_only_copies ->
1339
ReadST == disc_only_copies ->
1340
mnesia_lib:set_remote_where_to_read(Tab);
1346
BeingCreated == true ->
1347
%% The table is currently being created
1348
%% and we shall have an own copy of it.
1349
%% We will load the (empty) table locally.
1350
add_active_replica(Tab, Node),
1353
%% Own copy, go and get a copy of the table
1354
%% if the other node is master or if there
1355
%% are no master at all
1356
add_active_replica(Tab, Node),
1359
Worker = #net_load{table = Tab,
1360
reason = {active_remote, Node}},
1361
add_worker(Worker, State);
1366
%% We already have an own copy
1367
add_active_replica(Tab, Node),
1372
initial_safe_loads() ->
1373
case val({schema, storage_type}) of
1376
Tabs = val({schema, local_tables}) -- [schema],
1377
LastC = fun(T) -> last_consistent_replica(T, Downs) end,
1378
lists:zf(LastC, Tabs);
1381
Downs = mnesia_recover:get_mnesia_downs(),
1382
dbg_out("mnesia_downs = ~p~n", [Downs]),
1384
Tabs = val({schema, local_tables}) -- [schema],
1385
LastC = fun(T) -> last_consistent_replica(T, Downs) end,
1386
lists:zf(LastC, Tabs)
1389
last_consistent_replica(Tab, Downs) ->
1390
Cs = val({Tab, cstruct}),
1391
Storage = mnesia_lib:cs_to_storage_type(node(), Cs),
1392
Ram = Cs#cstruct.ram_copies,
1393
Disc = Cs#cstruct.disc_copies,
1394
DiscOnly = Cs#cstruct.disc_only_copies,
1395
BetterCopies0 = mnesia_lib:remote_copy_holders(Cs) -- Downs,
1396
BetterCopies = BetterCopies0 -- Ram,
1397
AccessMode = Cs#cstruct.access_mode,
1398
Copies = mnesia_lib:copy_holders(Cs),
1399
Masters = mnesia_recover:get_master_nodes(Tab),
1400
LocalMaster0 = lists:member(node(), Masters),
1401
LocalContent = Cs#cstruct.local_content,
1404
Masters == [] -> false;
1405
true -> not LocalMaster0
1409
Masters == [] -> false;
1410
true -> LocalMaster0
1413
Copies == [node()] ->
1414
%% Only one copy holder and it is local.
1415
%% It may also be a local contents table
1416
{true, {Tab, local_only}};
1417
LocalContent == true ->
1418
{true, {Tab, local_content}};
1419
LocalMaster == true ->
1420
%% We have a local master
1421
{true, {Tab, local_master}};
1422
RemoteMaster == true ->
1423
%% Wait for remote master copy
1425
Storage == ram_copies ->
1427
Disc == [], DiscOnly == [] ->
1428
%% Nobody has copy on disc
1429
{true, {Tab, ram_only}};
1431
%% Some other node has copy on disc
1434
AccessMode == read_only ->
1435
%% No one has been able to update the table,
1436
%% i.e. all disc resident copies are equal
1437
{true, {Tab, read_only}};
1438
BetterCopies /= [], Masters /= [node()] ->
1439
%% There are better copies on other nodes
1440
%% and we do not have the only master copy
1443
{true, {Tab, initial}}
1446
reconfigure_tables(N, State, [Tab |Tail]) ->
1447
del_active_replica(Tab, N),
1448
case val({Tab, where_to_read}) of
1449
N -> mnesia_lib:set_remote_where_to_read(Tab);
1452
LateQ = drop_loaders(Tab, N, State#state.late_loader_queue),
1453
reconfigure_tables(N, State#state{late_loader_queue = LateQ}, Tail);
1455
reconfigure_tables(_, State, []) ->
1458
remove_early_messages([], _Node) ->
1460
remove_early_messages([{call, {add_active_replica, [_, Node, _, _], _}, _}|R], Node) ->
1461
remove_early_messages(R, Node); %% Does a reply before queuing
1462
remove_early_messages([{call, {block_table, _, From}, ReplyTo}|R], Node)
1463
when node(From) == Node ->
1464
reply(ReplyTo, ok), %% Remove gen:server waits..
1465
remove_early_messages(R, Node);
1466
remove_early_messages([{cast, {i_have_tab, _Tab, Node}}|R], Node) ->
1467
remove_early_messages(R, Node);
1468
remove_early_messages([{cast, {adopt_orphans, Node, _Tabs}}|R], Node) ->
1469
remove_early_messages(R, Node);
1470
remove_early_messages([M|R],Node) ->
1471
[M|remove_early_messages(R,Node)].
1473
%% Drop loader from late load queue and possibly trigger a disc_load
1474
drop_loaders(Tab, Node, [H | T]) when H#late_load.table == Tab ->
1475
%% Check if it is time to issue a disc_load request
1476
case H#late_load.loaders of
1478
Reason = {H#late_load.reason, last_loader_down, Node},
1479
cast({disc_load, Tab, Reason}); % Ugly cast
1483
%% Drop the node from the list of loaders
1484
H2 = H#late_load{loaders = H#late_load.loaders -- [Node]},
1485
[H2 | drop_loaders(Tab, Node, T)];
1486
drop_loaders(Tab, Node, [H | T]) ->
1487
[H | drop_loaders(Tab, Node, T)];
1488
drop_loaders(_, _, []) ->
1491
add_active_replica(Tab, Node) ->
1492
add_active_replica(Tab, Node, val({Tab, cstruct})).
1494
add_active_replica(Tab, Node, Cs) when record(Cs, cstruct) ->
1495
Storage = mnesia_lib:schema_cs_to_storage_type(Node, Cs),
1496
AccessMode = Cs#cstruct.access_mode,
1497
add_active_replica(Tab, Node, Storage, AccessMode).
1499
%% Block table primitives
1502
Var = {Tab, where_to_commit},
1504
New = {blocked, Old},
1505
set(Var, New). % where_to_commit
1507
unblock_table(Tab) ->
1508
Var = {Tab, where_to_commit},
1516
set(Var, New). % where_to_commit
1518
is_tab_blocked(W2C) when list(W2C) ->
1520
is_tab_blocked({blocked, W2C}) when list(W2C) ->
1523
mark_blocked_tab(true, Value) ->
1525
mark_blocked_tab(false, Value) ->
1530
add_active_replica(Tab, Node, Storage, AccessMode) ->
1531
Var = {Tab, where_to_commit},
1532
{Blocked, Old} = is_tab_blocked(val(Var)),
1533
Del = lists:keydelete(Node, 1, Old),
1536
New = lists:sort([{Node, Storage} | Del]),
1537
set(Var, mark_blocked_tab(Blocked, New)), % where_to_commit
1538
add({Tab, where_to_write}, Node);
1540
set(Var, mark_blocked_tab(Blocked, Del)),
1541
mnesia_lib:del({Tab, where_to_write}, Node)
1543
add({Tab, active_replicas}, Node).
1545
del_active_replica(Tab, Node) ->
1546
Var = {Tab, where_to_commit},
1547
{Blocked, Old} = is_tab_blocked(val(Var)),
1548
Del = lists:keydelete(Node, 1, Old),
1549
New = lists:sort(Del),
1550
set(Var, mark_blocked_tab(Blocked, New)), % where_to_commit
1551
mnesia_lib:del({Tab, active_replicas}, Node),
1552
mnesia_lib:del({Tab, where_to_write}, Node).
1554
change_table_access_mode(Cs) ->
1555
Tab = Cs#cstruct.name,
1556
lists:foreach(fun(N) -> add_active_replica(Tab, N, Cs) end,
1557
val({Tab, active_replicas})).
1559
%% node To now has tab loaded, but this must be undone
1560
%% This code is rpc:call'ed from the tab_copier process
1561
%% when it has *not* released it's table lock
1562
unannounce_add_table_copy(Tab, To) ->
1563
del_active_replica(Tab, To),
1564
case val({Tab , where_to_read}) of
1566
mnesia_lib:set_remote_where_to_read(Tab);
1571
user_sync_tab(Tab) ->
1574
mnesia_subscr:subscribe(whereis(mnesia_event), {table, Tab});
1579
case erase({sync_tab, Tab}) of
1583
lists:foreach(fun(Pid) -> sync_reply(Pid, Tab) end, Pids)
1587
case val({Tab, local_content}) of
1589
mnesia_lib:set_local_content_whereabouts(Tab);
1591
set({Tab, where_to_read}, node())
1593
add_active_replica(Tab, node()).
1595
sync_and_block_table_whereabouts(Tab, ToNode, RemoteS, AccessMode) when Tab /= schema ->
1596
Current = val({current, db_nodes}),
1598
case lists:member(ToNode, Current) of
1599
true -> Current -- [ToNode];
1602
remote_call(ToNode, block_table, [Tab]),
1603
[remote_call(Node, add_active_replica, [Tab, ToNode, RemoteS, AccessMode]) ||
1604
Node <- [ToNode | Ns]],
1607
sync_del_table_copy_whereabouts(Tab, ToNode) when Tab /= schema ->
1608
Current = val({current, db_nodes}),
1610
case lists:member(ToNode, Current) of
1612
false -> [ToNode | Current]
1614
Args = [Tab, ToNode],
1615
[remote_call(Node, unannounce_add_table_copy, Args) || Node <- Ns],
1618
get_info(Timeout) ->
1619
case whereis(?SERVER_NAME) of
1623
Pid ! {self(), get_state},
1625
{?SERVER_NAME, State} when record(State, state) ->
1632
get_workers(Timeout) ->
1633
case whereis(?SERVER_NAME) of
1637
Pid ! {self(), get_state},
1639
{?SERVER_NAME, State} when record(State, state) ->
1640
{workers, State#state.loader_pid, State#state.sender_pid, State#state.dumper_pid}
1647
Tabs = mnesia_lib:local_active_tables(),
1648
io:format( "---> Active tables <--- ~n", []),
1651
info([Tab | Tail]) ->
1652
case val({Tab, storage_type}) of
1655
dets:info(Tab, size),
1656
dets:info(Tab, file_size),
1660
?ets_info(Tab, size),
1661
?ets_info(Tab, memory),
1666
info(Tab) -> info([Tab]).
1668
info_format(Tab, Size, Mem, Media) ->
1669
StrT = mnesia_lib:pad_name(atom_to_list(Tab), 15, []),
1670
StrS = mnesia_lib:pad_name(integer_to_list(Size), 8, []),
1671
StrM = mnesia_lib:pad_name(integer_to_list(Mem), 8, []),
1672
io:format("~s: with ~s records occupying ~s ~s~n",
1673
[StrT, StrS, StrM, Media]).
1675
%% Handle early arrived messages
1676
handle_early_msgs([Msg | Msgs], State) ->
1677
%% The messages are in reverse order
1678
case handle_early_msg(Msg, State) of
1679
{stop, Reason, Reply, State2} ->
1680
{stop, Reason, Reply, State2};
1681
{stop, Reason, State2} ->
1682
{stop, Reason, State2};
1683
{noreply, State2} ->
1684
handle_early_msgs(Msgs, State2);
1685
{noreply, State2, _Timeout} ->
1686
handle_early_msgs(Msgs, State2);
1688
dbg_out("handle_early_msgs case clause ~p ~n", [Else]),
1689
erlang:error(Else, [[Msg | Msgs], State])
1691
handle_early_msgs([], State) ->
1694
handle_early_msg({call, Msg, From}, State) ->
1695
handle_call(Msg, From, State);
1696
handle_early_msg({cast, Msg}, State) ->
1697
handle_cast(Msg, State);
1698
handle_early_msg({info, Msg}, State) ->
1699
handle_info(Msg, State).
1704
reply(undefined, Reply) ->
1706
reply(ReplyTo, Reply) ->
1707
gen_server:reply(ReplyTo, Reply),
1710
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
1711
%% Worker management
1713
%% Returns new State
1714
add_worker(Worker, State) when record(Worker, dump_log) ->
1715
InitBy = Worker#dump_log.initiated_by,
1716
Queue = State#state.dumper_queue,
1717
case lists:keymember(InitBy, #dump_log.initiated_by, Queue) of
1720
true when Worker#dump_log.opt_reply_to == undefined ->
1721
%% The same threshold has been exceeded again,
1722
%% before we have had the possibility to
1723
%% process the older one.
1724
DetectedBy = {dump_log, InitBy},
1725
Event = {mnesia_overload, DetectedBy},
1726
mnesia_lib:report_system_event(Event)
1728
Queue2 = Queue ++ [Worker],
1729
State2 = State#state{dumper_queue = Queue2},
1730
opt_start_worker(State2);
1731
add_worker(Worker, State) when record(Worker, schema_commit_lock) ->
1732
Queue = State#state.dumper_queue,
1733
Queue2 = Queue ++ [Worker],
1734
State2 = State#state{dumper_queue = Queue2},
1735
opt_start_worker(State2);
1736
add_worker(Worker, State) when record(Worker, net_load) ->
1737
Queue = State#state.loader_queue,
1738
State2 = State#state{loader_queue = Queue ++ [Worker]},
1739
opt_start_worker(State2);
1740
add_worker(Worker, State) when record(Worker, send_table) ->
1741
Queue = State#state.sender_queue,
1742
State2 = State#state{sender_queue = Queue ++ [Worker]},
1743
opt_start_worker(State2);
1744
add_worker(Worker, State) when record(Worker, disc_load) ->
1745
Queue = State#state.loader_queue,
1746
State2 = State#state{loader_queue = Queue ++ [Worker]},
1747
opt_start_worker(State2);
1748
% Block controller should be used for upgrading mnesia.
1749
add_worker(Worker, State) when record(Worker, block_controller) ->
1750
Queue = State#state.dumper_queue,
1751
Queue2 = [Worker | Queue],
1752
State2 = State#state{dumper_queue = Queue2},
1753
opt_start_worker(State2).
1755
%% Optionally start a worker
1757
%% Dumpers and loaders may run simultaneously
1758
%% but neither of them may run during schema commit.
1759
%% Loaders may not start if a schema commit is enqueued.
1760
opt_start_worker(State) when State#state.is_stopping == true ->
1762
opt_start_worker(State) ->
1763
%% Prioritize dumper and schema commit
1764
%% by checking them first
1765
case State#state.dumper_queue of
1766
[Worker | _Rest] when State#state.dumper_pid == undefined ->
1767
%% Great, a worker in queue and neither
1768
%% a schema transaction is being
1769
%% committed and nor a dumper is running
1771
%% Start worker but keep him in the queue
1773
record(Worker, schema_commit_lock) ->
1774
ReplyTo = Worker#schema_commit_lock.owner,
1775
reply(ReplyTo, granted),
1776
{Owner, _Tag} = ReplyTo,
1777
State#state{dumper_pid = Owner};
1779
record(Worker, dump_log) ->
1780
Pid = spawn_link(?MODULE, dump_and_reply, [self(), Worker]),
1781
State2 = State#state{dumper_pid = Pid},
1783
%% If the worker was a dumper we may
1784
%% possibly be able to start a loader
1786
State3 = opt_start_sender(State2),
1787
opt_start_loader(State3);
1789
record(Worker, block_controller) ->
1790
case {State#state.sender_pid, State#state.loader_pid} of
1791
{undefined, undefined} ->
1792
ReplyTo = Worker#block_controller.owner,
1793
reply(ReplyTo, granted),
1794
{Owner, _Tag} = ReplyTo,
1795
State#state{dumper_pid = Owner};
1801
%% Bad luck, try with a loader or sender instead
1802
State2 = opt_start_sender(State),
1803
opt_start_loader(State2)
1806
opt_start_sender(State) ->
1807
case State#state.sender_queue of
1812
_ when State#state.sender_pid /= undefined ->
1813
%% Bad luck, a sender is already running
1816
[Sender | _SenderRest] ->
1817
case State#state.loader_queue of
1818
[Loader | _LoaderRest]
1819
when State#state.loader_pid /= undefined,
1820
Loader#net_load.table == Sender#send_table.table ->
1821
%% A conflicting loader is running
1824
SchemaQueue = State#state.dumper_queue,
1825
case lists:keymember(schema_commit, 1, SchemaQueue) of
1828
%% Start worker but keep him in the queue
1829
Pid = spawn_link(?MODULE, send_and_reply,
1831
State#state{sender_pid = Pid};
1833
%% Bad luck, we must wait for the schema commit
1839
opt_start_loader(State) ->
1840
LoaderQueue = State#state.loader_queue,
1842
LoaderQueue == [] ->
1846
State#state.loader_pid /= undefined ->
1847
%% Bad luck, an loader is already running
1851
SchemaQueue = State#state.dumper_queue,
1852
case lists:keymember(schema_commit, 1, SchemaQueue) of
1854
{Worker, Rest} = pick_next(LoaderQueue),
1856
%% Start worker but keep him in the queue
1857
Pid = spawn_link(?MODULE, load_and_reply, [self(), Worker]),
1858
State#state{loader_pid = Pid,
1859
loader_queue = [Worker | Rest]};
1861
%% Bad luck, we must wait for the schema commit
1866
start_remote_sender(Node, Tab, Receiver, Storage) ->
1867
Msg = #send_table{table = Tab,
1868
receiver_pid = Receiver,
1869
remote_storage = Storage},
1870
gen_server:cast({?SERVER_NAME, Node}, Msg).
1872
dump_and_reply(ReplyTo, Worker) ->
1873
%% No trap_exit, die intentionally instead
1874
Res = mnesia_dumper:opt_dump_log(Worker#dump_log.initiated_by),
1875
ReplyTo ! #dumper_done{worker_pid = self(),
1880
send_and_reply(ReplyTo, Worker) ->
1881
%% No trap_exit, die intentionally instead
1882
Res = mnesia_loader:send_table(Worker#send_table.receiver_pid,
1883
Worker#send_table.table,
1884
Worker#send_table.remote_storage),
1885
ReplyTo ! #sender_done{worker_pid = self(),
1891
load_and_reply(ReplyTo, Worker) ->
1892
process_flag(trap_exit, true),
1893
Done = load_table(Worker),
1894
ReplyTo ! Done#loader_done{worker_pid = self()},
1898
%% Now it is time to load the table
1899
%% but first we must check if it still is neccessary
1900
load_table(Load) when record(Load, net_load) ->
1901
Tab = Load#net_load.table,
1902
ReplyTo = Load#net_load.opt_reply_to,
1903
Reason = Load#net_load.reason,
1904
LocalC = val({Tab, local_content}),
1905
AccessMode = val({Tab, access_mode}),
1906
ReadNode = val({Tab, where_to_read}),
1907
Active = filter_active(Tab),
1908
Done = #loader_done{is_loaded = true,
1910
needs_announce = false,
1914
reply = {loaded, ok}
1917
ReadNode == node() ->
1918
%% Already loaded locally
1921
Res = mnesia_loader:disc_load_table(Tab, load_local_content),
1922
Done#loader_done{reply = Res, needs_announce = true, needs_sync = true};
1923
AccessMode == read_only ->
1924
disc_load_table(Tab, Reason, ReplyTo);
1926
%% Either we cannot read the table yet
1927
%% or someone is moving a replica between
1929
Cs = Load#net_load.cstruct,
1930
Res = mnesia_loader:net_load_table(Tab, Reason, Active, Cs),
1933
Done#loader_done{needs_sync = true,
1935
{not_loaded, storage_unknown} ->
1936
Done#loader_done{reply = Res};
1938
Done#loader_done{is_loaded = false,
1939
needs_reply = false,
1944
load_table(Load) when record(Load, disc_load) ->
1945
Tab = Load#disc_load.table,
1946
Reason = Load#disc_load.reason,
1947
ReplyTo = Load#disc_load.opt_reply_to,
1948
ReadNode = val({Tab, where_to_read}),
1949
Active = filter_active(Tab),
1950
Done = #loader_done{is_loaded = true,
1952
needs_announce = false,
1957
Active == [], ReadNode == nowhere ->
1958
%% Not loaded anywhere, lets load it from disc
1959
disc_load_table(Tab, Reason, ReplyTo);
1960
ReadNode == nowhere ->
1961
%% Already loaded on other node, lets get it
1962
Cs = val({Tab, cstruct}),
1963
case mnesia_loader:net_load_table(Tab, Reason, Active, Cs) of
1965
Done#loader_done{needs_sync = true};
1966
{not_loaded, storage_unknown} ->
1967
Done#loader_done{is_loaded = false};
1968
{not_loaded, ErrReason} ->
1969
Done#loader_done{is_loaded = false,
1970
reply = {not_loaded,ErrReason}}
1973
%% Already readable, do not worry be happy
1977
disc_load_table(Tab, Reason, ReplyTo) ->
1978
Done = #loader_done{is_loaded = true,
1980
needs_announce = false,
1984
reply = {loaded, ok}
1986
Res = mnesia_loader:disc_load_table(Tab, Reason),
1988
Res == {loaded, ok} ->
1989
Done#loader_done{needs_announce = true,
1992
ReplyTo /= undefined ->
1993
Done#loader_done{is_loaded = false,
1996
fatal("Cannot load table ~p from disc: ~p~n", [Tab, Res])
1999
filter_active(Tab) ->
2000
ByForce = val({Tab, load_by_force}),
2001
Active = val({Tab, active_replicas}),
2002
Masters = mnesia_recover:get_master_nodes(Tab),
2003
do_filter_active(ByForce, Active, Masters).
2005
do_filter_active(true, Active, _Masters) ->
2007
do_filter_active(false, Active, []) ->
2009
do_filter_active(false, Active, Masters) ->
2010
mnesia_lib:intersect(Active, Masters).