119
119
is_stopping = false
121
121
%% Backwards Comp. Sender_pid is now a list of senders..
122
get_senders(#state{sender_pid = Pids}) when list(Pids) -> Pids.
122
get_senders(#state{sender_pid = Pids}) when is_list(Pids) -> Pids.
123
123
%% Backwards Comp. loader_pid is now a list of loaders..
124
get_loaders(#state{loader_pid = Pids}) when list(Pids) -> Pids.
124
get_loaders(#state{loader_pid = Pids}) when is_list(Pids) -> Pids.
126
126
case ?catch_val(no_table_loaders) of
200
200
%% If Mnesia stops, we will wait for Mnesia to restart
201
201
%% We will wait even if the list of tables is empty
203
wait_for_tables(Tabs, Timeout) when list(Tabs), Timeout == infinity ->
203
wait_for_tables(Tabs, Timeout) when is_list(Tabs), Timeout == infinity ->
204
204
do_wait_for_tables(Tabs, Timeout);
205
wait_for_tables(Tabs, Timeout) when list(Tabs),
206
integer(Timeout), Timeout >= 0 ->
205
wait_for_tables(Tabs, Timeout) when is_list(Tabs),
206
is_integer(Timeout), Timeout >= 0 ->
207
207
do_wait_for_tables(Tabs, Timeout);
208
208
wait_for_tables(Tabs, Timeout) ->
209
209
{error, {badarg, Tabs, Timeout}}.
227
227
case catch mnesia_lib:active_tables() of
229
229
{error, {node_not_running, node()}};
230
Active when list(Active) ->
230
Active when is_list(Active) ->
231
231
case Tabs -- Active of
249
249
%% Mnesia is not started
250
250
{error, {node_not_running, node()}};
251
true when pid(Init) ->
251
true when is_pid(Init) ->
252
252
cast({sync_tabs, Tabs, self()}),
253
253
rec_tabs(Tabs, Tabs, From, Init)
349
349
disc_load_table(Tab, {dumper,change_table_copy_type}, undefined).
351
351
%% Returns ok instead of yes
352
force_load_table(Tab) when atom(Tab), Tab /= schema ->
352
force_load_table(Tab) when is_atom(Tab), Tab /= schema ->
353
353
case ?catch_val({Tab, storage_type}) of
355
355
do_force_load_table(Tab);
433
433
mnesia_lib:add_list(extra_db_nodes, New),
435
{aborted, {throw, Str}} when list(Str) ->
435
{aborted, {throw, Str}} when is_list(Str) ->
436
436
%%mnesia_recover:disconnect_nodes(New),
437
437
{error, {merge_schema_failed, lists:flatten(Str)}};
468
468
case try_merge_schema(AllNodes) of
470
470
schema_is_merged();
471
{aborted, {throw, Str}} when list(Str) ->
471
{aborted, {throw, Str}} when is_list(Str) ->
472
472
fatal("Failed to merge schema: ~s~n", [Str]);
474
474
fatal("Failed to merge schema: ~p~n", [Else])
896
896
State#state.is_stopping == true ->
897
897
{stop, shutdown, State};
898
record(hd(State#state.dumper_queue), block_controller) ->
898
is_record(hd(State#state.dumper_queue), block_controller) ->
899
899
[_Worker | Rest] = State#state.dumper_queue,
900
900
State2 = State#state{dumper_pid = undefined,
901
901
dumper_queue = Rest},
969
969
%% might trigger a table load from wrong nodes as a result of that we don't
970
970
%% know which tables we can load safly first.
971
971
handle_cast({im_running, _Node, NewFriends}, State) ->
972
Tabs = mnesia_lib:local_active_tables() -- [schema],
972
LocalTabs = mnesia_lib:local_active_tables() -- [schema],
973
RemoveLocalOnly = fun(Tab) -> not val({Tab, local_content}) end,
974
Tabs = lists:filter(RemoveLocalOnly, LocalTabs),
973
975
Ns = mnesia_lib:intersect(NewFriends, val({current, db_nodes})),
974
976
abcast(Ns, {adopt_orphans, node(), Tabs}),
979
981
State2 = add_worker(Worker, State),
982
handle_cast(Worker, State) when record(Worker, send_table) ->
984
handle_cast(Worker = #send_table{}, State) ->
983
985
State2 = add_worker(Worker, State),
1089
1091
State2 = add_worker(Worker, State),
1090
1092
noreply(State2);
1092
handle_info(Done, State) when record(Done, dumper_done) ->
1093
Pid = Done#dumper_done.worker_pid,
1094
Res = Done#dumper_done.worker_res,
1094
handle_info(#dumper_done{worker_pid=Pid, worker_res=Res}, State) ->
1096
1096
State#state.is_stopping == true ->
1097
1097
{stop, shutdown, State};
1107
1107
{stop, fatal, State}
1110
handle_info(Done, State0) when record(Done, loader_done) ->
1111
WPid = Done#loader_done.worker_pid,
1110
handle_info(Done = #loader_done{worker_pid=WPid, table_name=Tab}, State0) ->
1112
1111
LateQueue0 = State0#state.late_loader_queue,
1113
Tab = Done#loader_done.table_name,
1114
1112
State1 = State0#state{loader_pid = lists:keydelete(WPid,1,get_loaders(State0))},
1170
1168
State3 = opt_start_worker(State2),
1171
1169
noreply(State3);
1173
handle_info(Done, State) when record(Done, sender_done) ->
1174
Pid = Done#sender_done.worker_pid,
1175
Res = Done#sender_done.worker_res,
1171
handle_info(#sender_done{worker_pid=Pid, worker_res=Res}, State) ->
1176
1172
Senders = get_senders(State),
1177
1173
{value, {Pid,_Worker}} = lists:keysearch(Pid, 1, Senders),
1267
1263
{Tab, Worker} -> {Worker, gb_trees:delete(Tab,Queue)}
1270
pick_next([Head | Tail], Load, Order) when record(Head, net_load) ->
1271
Tab = Head#net_load.table,
1266
pick_next([Head = #net_load{table=Tab}| Tail], Load, Order) ->
1272
1267
select_best(Head, Tail, ?catch_val({Tab, load_order}), Load, Order);
1273
pick_next([Head | Tail], Load, Order) when record(Head, disc_load) ->
1274
Tab = Head#disc_load.table,
1268
pick_next([Head = #disc_load{table=Tab}| Tail], Load, Order) ->
1275
1269
select_best(Head, Tail, ?catch_val({Tab, load_order}), Load, Order);
1276
1270
pick_next([], none, _Order) ->
1629
1623
add_active_replica(Tab, Node) ->
1630
1624
add_active_replica(Tab, Node, val({Tab, cstruct})).
1632
add_active_replica(Tab, Node, Cs) when record(Cs, cstruct) ->
1626
add_active_replica(Tab, Node, Cs = #cstruct{}) ->
1633
1627
Storage = mnesia_lib:schema_cs_to_storage_type(Node, Cs),
1634
1628
AccessMode = Cs#cstruct.access_mode,
1635
1629
add_active_replica(Tab, Node, Storage, AccessMode).
1645
1639
unblock_table(Tab) ->
1646
1640
call({unblock_table, Tab}).
1648
is_tab_blocked(W2C) when list(W2C) ->
1642
is_tab_blocked(W2C) when is_list(W2C) ->
1650
is_tab_blocked({blocked, W2C}) when list(W2C) ->
1644
is_tab_blocked({blocked, W2C}) when is_list(W2C) ->
1653
1647
mark_blocked_tab(true, Value) ->
1772
1766
Pid ! {self(), get_state},
1774
{?SERVER_NAME, State} when record(State, state) ->
1768
{?SERVER_NAME, State = #state{}} ->
1775
1769
{workers, get_loaders(State), get_senders(State), State#state.dumper_pid}
1776
1770
after Timeout ->
1777
1771
{timeout, Timeout}
1845
1839
%% Worker management
1847
1841
%% Returns new State
1848
add_worker(Worker, State) when record(Worker, dump_log) ->
1842
add_worker(Worker = #dump_log{}, State) ->
1849
1843
InitBy = Worker#dump_log.initiated_by,
1850
1844
Queue = State#state.dumper_queue,
1851
1845
case lists:keymember(InitBy, #dump_log.initiated_by, Queue) of
1862
1856
Queue2 = Queue ++ [Worker],
1863
1857
State2 = State#state{dumper_queue = Queue2},
1864
1858
opt_start_worker(State2);
1865
add_worker(Worker, State) when record(Worker, schema_commit_lock) ->
1859
add_worker(Worker = #schema_commit_lock{}, State) ->
1866
1860
Queue = State#state.dumper_queue,
1867
1861
Queue2 = Queue ++ [Worker],
1868
1862
State2 = State#state{dumper_queue = Queue2},
1869
1863
opt_start_worker(State2);
1870
add_worker(Worker, State) when record(Worker, net_load) ->
1864
add_worker(Worker = #net_load{}, State) ->
1871
1865
opt_start_worker(add_loader(Worker#net_load.table,Worker,State));
1872
add_worker(Worker, State) when record(Worker, send_table) ->
1866
add_worker(Worker = #send_table{}, State) ->
1873
1867
Queue = State#state.sender_queue,
1874
1868
State2 = State#state{sender_queue = Queue ++ [Worker]},
1875
1869
opt_start_worker(State2);
1876
add_worker(Worker, State) when record(Worker, disc_load) ->
1870
add_worker(Worker = #disc_load{}, State) ->
1877
1871
opt_start_worker(add_loader(Worker#disc_load.table,Worker,State));
1878
1872
% Block controller should be used for upgrading mnesia.
1879
add_worker(Worker, State) when record(Worker, block_controller) ->
1873
add_worker(Worker = #block_controller{}, State) ->
1880
1874
Queue = State#state.dumper_queue,
1881
1875
Queue2 = [Worker | Queue],
1882
1876
State2 = State#state{dumper_queue = Queue2},
1909
1903
%% Start worker but keep him in the queue
1911
record(Worker, schema_commit_lock) ->
1905
is_record(Worker, schema_commit_lock) ->
1912
1906
ReplyTo = Worker#schema_commit_lock.owner,
1913
1907
reply(ReplyTo, granted),
1914
1908
{Owner, _Tag} = ReplyTo,
1915
1909
opt_start_loader(State#state{dumper_pid = Owner});
1917
record(Worker, dump_log) ->
1911
is_record(Worker, dump_log) ->
1918
1912
Pid = spawn_link(?MODULE, dump_and_reply, [self(), Worker]),
1919
1913
State2 = State#state{dumper_pid = Pid},
1924
1918
State3 = opt_start_sender(State2),
1925
1919
opt_start_loader(State3);
1927
record(Worker, block_controller) ->
1921
is_record(Worker, block_controller) ->
1928
1922
case {get_senders(State), get_loaders(State)} of
1930
1924
ReplyTo = Worker#block_controller.owner,
2053
2047
%% Now it is time to load the table
2054
2048
%% but first we must check if it still is neccessary
2055
load_table_fun(Load) when record(Load, net_load) ->
2056
Tab = Load#net_load.table,
2057
ReplyTo = Load#net_load.opt_reply_to,
2058
Reason = Load#net_load.reason,
2049
load_table_fun(#net_load{cstruct=Cs, table=Tab, reason=Reason, opt_reply_to=ReplyTo}) ->
2059
2050
LocalC = val({Tab, local_content}),
2060
2051
AccessMode = val({Tab, access_mode}),
2061
2052
ReadNode = val({Tab, where_to_read}),
2084
2075
%% Either we cannot read the table yet
2085
2076
%% or someone is moving a replica between
2087
Cs = Load#net_load.cstruct,
2088
2078
Res = mnesia_loader:net_load_table(Tab, Reason, Active, Cs),
2090
2080
{loaded, ok} ->
2099
load_table_fun(Load) when record(Load, disc_load) ->
2100
Tab = Load#disc_load.table,
2101
Reason = Load#disc_load.reason,
2102
ReplyTo = Load#disc_load.opt_reply_to,
2089
load_table_fun(#disc_load{table=Tab, reason=Reason, opt_reply_to=ReplyTo}) ->
2103
2090
ReadNode = val({Tab, where_to_read}),
2104
2091
Active = filter_active(Tab),
2105
2092
Done = #loader_done{is_loaded = true,