~ubuntu-branches/ubuntu/trusty/erlang/trusty

« back to all changes in this revision

Viewing changes to lib/mnesia/src/mnesia_controller.erl

  • Committer: Bazaar Package Importer
  • Author(s): Sergei Golovan
  • Date: 2009-06-11 12:18:07 UTC
  • mfrom: (1.2.2 upstream)
  • Revision ID: james.westby@ubuntu.com-20090611121807-ks7eb4xrt7dsysgx
Tags: 1:13.b.1-dfsg-1
* New upstream release.
* Removed unnecessary dependency of erlang-os-mon on erlang-observer and
  erlang-tools and added missing dependency of erlang-nox on erlang-os-mon
  (closes: #529512).
* Removed a patch to eunit application because the bug was fixed upstream.

Show diffs side-by-side

added added

removed removed

Lines of Context:
119
119
                is_stopping = false
120
120
               }).
121
121
%% Backwards Comp. Sender_pid is now a list of senders..
122
 
get_senders(#state{sender_pid = Pids}) when list(Pids) -> Pids.    
 
122
get_senders(#state{sender_pid = Pids}) when is_list(Pids) -> Pids.    
123
123
%% Backwards Comp. loader_pid is now a list of loaders..
124
 
get_loaders(#state{loader_pid = Pids}) when list(Pids) -> Pids.    
 
124
get_loaders(#state{loader_pid = Pids}) when is_list(Pids) -> Pids.    
125
125
max_loaders() ->
126
126
    case ?catch_val(no_table_loaders) of
127
127
        {'EXIT', _} -> 
200
200
%% If Mnesia stops, we will wait for Mnesia to restart
201
201
%% We will wait even if the list of tables is empty
202
202
%%
203
 
wait_for_tables(Tabs, Timeout) when list(Tabs), Timeout == infinity ->
 
203
wait_for_tables(Tabs, Timeout) when is_list(Tabs), Timeout == infinity ->
204
204
    do_wait_for_tables(Tabs, Timeout);
205
 
wait_for_tables(Tabs, Timeout) when list(Tabs),
206
 
                                    integer(Timeout), Timeout >= 0 ->
 
205
wait_for_tables(Tabs, Timeout) when is_list(Tabs),
 
206
                                    is_integer(Timeout), Timeout >= 0 ->
207
207
    do_wait_for_tables(Tabs, Timeout);
208
208
wait_for_tables(Tabs, Timeout) ->
209
209
    {error, {badarg, Tabs, Timeout}}.
227
227
    case catch mnesia_lib:active_tables() of
228
228
        {'EXIT', _} ->
229
229
            {error, {node_not_running, node()}};
230
 
        Active when list(Active) ->
 
230
        Active when is_list(Active) ->
231
231
            case Tabs -- Active of
232
232
                [] ->
233
233
                    ok;
248
248
        {'EXIT', _} ->
249
249
            %% Mnesia is not started
250
250
            {error, {node_not_running, node()}};
251
 
        true when pid(Init) ->
 
251
        true when is_pid(Init) ->
252
252
            cast({sync_tabs, Tabs, self()}),
253
253
            rec_tabs(Tabs, Tabs, From, Init)
254
254
    end.
349
349
    disc_load_table(Tab, {dumper,change_table_copy_type}, undefined).
350
350
 
351
351
%% Returns ok instead of yes
352
 
force_load_table(Tab) when atom(Tab), Tab /= schema ->
 
352
force_load_table(Tab) when is_atom(Tab), Tab /= schema ->
353
353
    case ?catch_val({Tab, storage_type}) of
354
354
        ram_copies ->
355
355
            do_force_load_table(Tab);
432
432
                        ok -> 
433
433
                            mnesia_lib:add_list(extra_db_nodes, New),
434
434
                            {ok, New};
435
 
                        {aborted, {throw, Str}} when list(Str) ->
 
435
                        {aborted, {throw, Str}} when is_list(Str) ->
436
436
                            %%mnesia_recover:disconnect_nodes(New),
437
437
                            {error, {merge_schema_failed, lists:flatten(Str)}};
438
438
                        Else ->
468
468
    case try_merge_schema(AllNodes) of
469
469
        ok -> 
470
470
            schema_is_merged();
471
 
        {aborted, {throw, Str}} when list(Str) ->
 
471
        {aborted, {throw, Str}} when is_list(Str) ->
472
472
            fatal("Failed to merge schema: ~s~n", [Str]);
473
473
        Else ->
474
474
            fatal("Failed to merge schema: ~p~n", [Else])
895
895
    if
896
896
        State#state.is_stopping == true ->
897
897
            {stop, shutdown, State};
898
 
        record(hd(State#state.dumper_queue), block_controller) ->
 
898
        is_record(hd(State#state.dumper_queue), block_controller) ->
899
899
            [_Worker | Rest] = State#state.dumper_queue,
900
900
            State2 = State#state{dumper_pid = undefined,
901
901
                                 dumper_queue = Rest},
969
969
%% might trigger a table load from wrong nodes as a result of that we don't 
970
970
%% know which tables we can load safly first.
971
971
handle_cast({im_running, _Node, NewFriends}, State) ->
972
 
    Tabs = mnesia_lib:local_active_tables() -- [schema],
 
972
    LocalTabs = mnesia_lib:local_active_tables() -- [schema],
 
973
    RemoveLocalOnly = fun(Tab) -> not val({Tab, local_content}) end,
 
974
    Tabs = lists:filter(RemoveLocalOnly, LocalTabs),
973
975
    Ns = mnesia_lib:intersect(NewFriends, val({current, db_nodes})),
974
976
    abcast(Ns, {adopt_orphans, node(), Tabs}),
975
977
    noreply(State);
979
981
    State2 = add_worker(Worker, State),
980
982
    noreply(State2);
981
983
 
982
 
handle_cast(Worker, State) when record(Worker, send_table) ->
 
984
handle_cast(Worker = #send_table{}, State) ->
983
985
    State2 = add_worker(Worker, State),
984
986
    noreply(State2);
985
987
 
1089
1091
    State2 = add_worker(Worker, State),
1090
1092
    noreply(State2);
1091
1093
 
1092
 
handle_info(Done, State) when record(Done, dumper_done) ->
1093
 
    Pid = Done#dumper_done.worker_pid,
1094
 
    Res = Done#dumper_done.worker_res,
 
1094
handle_info(#dumper_done{worker_pid=Pid, worker_res=Res}, State) ->
1095
1095
    if
1096
1096
        State#state.is_stopping == true ->
1097
1097
            {stop, shutdown, State};
1107
1107
            {stop, fatal, State}
1108
1108
    end;
1109
1109
 
1110
 
handle_info(Done, State0) when record(Done, loader_done) ->    
1111
 
    WPid = Done#loader_done.worker_pid,
 
1110
handle_info(Done = #loader_done{worker_pid=WPid, table_name=Tab}, State0) ->    
1112
1111
    LateQueue0 = State0#state.late_loader_queue,
1113
 
    Tab = Done#loader_done.table_name,
1114
1112
    State1 = State0#state{loader_pid = lists:keydelete(WPid,1,get_loaders(State0))},
1115
1113
 
1116
1114
    State2 =
1170
1168
    State3 = opt_start_worker(State2),
1171
1169
    noreply(State3);
1172
1170
 
1173
 
handle_info(Done, State) when record(Done, sender_done) ->
1174
 
    Pid = Done#sender_done.worker_pid,
1175
 
    Res = Done#sender_done.worker_res,
 
1171
handle_info(#sender_done{worker_pid=Pid, worker_res=Res}, State)  ->
1176
1172
    Senders = get_senders(State),
1177
1173
    {value, {Pid,_Worker}} = lists:keysearch(Pid, 1, Senders),
1178
1174
    if
1267
1263
        {Tab, Worker} -> {Worker, gb_trees:delete(Tab,Queue)}
1268
1264
    end.
1269
1265
 
1270
 
pick_next([Head | Tail], Load, Order) when record(Head, net_load) ->
1271
 
    Tab = Head#net_load.table,
 
1266
pick_next([Head = #net_load{table=Tab}| Tail], Load, Order) ->
1272
1267
    select_best(Head, Tail, ?catch_val({Tab, load_order}), Load, Order);
1273
 
pick_next([Head | Tail], Load, Order) when record(Head, disc_load) ->
1274
 
    Tab = Head#disc_load.table,
 
1268
pick_next([Head = #disc_load{table=Tab}| Tail], Load, Order) ->
1275
1269
    select_best(Head, Tail, ?catch_val({Tab, load_order}), Load, Order);
1276
1270
pick_next([], none, _Order) ->
1277
1271
    none;
1629
1623
add_active_replica(Tab, Node) ->
1630
1624
    add_active_replica(Tab, Node, val({Tab, cstruct})).
1631
1625
 
1632
 
add_active_replica(Tab, Node, Cs) when record(Cs, cstruct) ->
 
1626
add_active_replica(Tab, Node, Cs = #cstruct{}) ->
1633
1627
    Storage = mnesia_lib:schema_cs_to_storage_type(Node, Cs),
1634
1628
    AccessMode = Cs#cstruct.access_mode,
1635
1629
    add_active_replica(Tab, Node, Storage, AccessMode).
1645
1639
unblock_table(Tab) ->
1646
1640
    call({unblock_table, Tab}).
1647
1641
 
1648
 
is_tab_blocked(W2C) when list(W2C) ->
 
1642
is_tab_blocked(W2C) when is_list(W2C) ->
1649
1643
    {false, W2C};
1650
 
is_tab_blocked({blocked, W2C}) when list(W2C) ->
 
1644
is_tab_blocked({blocked, W2C}) when is_list(W2C) ->
1651
1645
    {true, W2C}.
1652
1646
 
1653
1647
mark_blocked_tab(true, Value) -> 
1771
1765
        Pid ->
1772
1766
            Pid ! {self(), get_state},
1773
1767
            receive
1774
 
                {?SERVER_NAME, State} when record(State, state) ->
 
1768
                {?SERVER_NAME, State = #state{}} ->
1775
1769
                    {workers, get_loaders(State), get_senders(State), State#state.dumper_pid}
1776
1770
            after Timeout ->
1777
1771
                    {timeout, Timeout}
1845
1839
%% Worker management
1846
1840
 
1847
1841
%% Returns new State
1848
 
add_worker(Worker, State) when record(Worker, dump_log) ->
 
1842
add_worker(Worker = #dump_log{}, State) ->
1849
1843
    InitBy = Worker#dump_log.initiated_by,
1850
1844
    Queue = State#state.dumper_queue,
1851
1845
    case lists:keymember(InitBy, #dump_log.initiated_by, Queue) of
1862
1856
    Queue2 = Queue ++ [Worker],
1863
1857
    State2 = State#state{dumper_queue = Queue2},
1864
1858
    opt_start_worker(State2);
1865
 
add_worker(Worker, State) when record(Worker, schema_commit_lock) ->
 
1859
add_worker(Worker = #schema_commit_lock{}, State) ->
1866
1860
    Queue = State#state.dumper_queue,
1867
1861
    Queue2 = Queue ++ [Worker],
1868
1862
    State2 = State#state{dumper_queue = Queue2},
1869
1863
    opt_start_worker(State2);
1870
 
add_worker(Worker, State) when record(Worker, net_load) ->
 
1864
add_worker(Worker = #net_load{}, State) ->
1871
1865
    opt_start_worker(add_loader(Worker#net_load.table,Worker,State));
1872
 
add_worker(Worker, State) when record(Worker, send_table) ->
 
1866
add_worker(Worker = #send_table{}, State) ->
1873
1867
    Queue = State#state.sender_queue,
1874
1868
    State2 = State#state{sender_queue = Queue ++ [Worker]},
1875
1869
    opt_start_worker(State2);
1876
 
add_worker(Worker, State) when record(Worker, disc_load) ->
 
1870
add_worker(Worker = #disc_load{}, State) ->
1877
1871
    opt_start_worker(add_loader(Worker#disc_load.table,Worker,State));
1878
1872
% Block controller should be used for upgrading mnesia.
1879
 
add_worker(Worker, State) when record(Worker, block_controller) -> 
 
1873
add_worker(Worker = #block_controller{}, State) -> 
1880
1874
    Queue = State#state.dumper_queue,
1881
1875
    Queue2 = [Worker | Queue],
1882
1876
    State2 = State#state{dumper_queue = Queue2},
1908
1902
                    
1909
1903
            %% Start worker but keep him in the queue
1910
1904
            if
1911
 
                record(Worker, schema_commit_lock) ->
 
1905
                is_record(Worker, schema_commit_lock) ->
1912
1906
                    ReplyTo = Worker#schema_commit_lock.owner,
1913
1907
                    reply(ReplyTo, granted),
1914
1908
                    {Owner, _Tag} = ReplyTo,
1915
1909
                    opt_start_loader(State#state{dumper_pid = Owner});
1916
1910
                
1917
 
                record(Worker, dump_log) ->
 
1911
                is_record(Worker, dump_log) ->
1918
1912
                    Pid = spawn_link(?MODULE, dump_and_reply, [self(), Worker]),
1919
1913
                    State2 = State#state{dumper_pid = Pid},
1920
1914
 
1924
1918
                    State3 = opt_start_sender(State2),
1925
1919
                    opt_start_loader(State3);
1926
1920
                
1927
 
                record(Worker, block_controller) ->
 
1921
                is_record(Worker, block_controller) ->
1928
1922
                    case {get_senders(State), get_loaders(State)} of
1929
1923
                        {[], []} ->
1930
1924
                            ReplyTo = Worker#block_controller.owner,
2052
2046
 
2053
2047
%% Now it is time to load the table
2054
2048
%% but first we must check if it still is neccessary
2055
 
load_table_fun(Load) when record(Load, net_load) ->
2056
 
    Tab = Load#net_load.table,
2057
 
    ReplyTo = Load#net_load.opt_reply_to,
2058
 
    Reason =  Load#net_load.reason,
 
2049
load_table_fun(#net_load{cstruct=Cs, table=Tab, reason=Reason, opt_reply_to=ReplyTo}) ->
2059
2050
    LocalC = val({Tab, local_content}),
2060
2051
    AccessMode = val({Tab, access_mode}),
2061
2052
    ReadNode = val({Tab, where_to_read}),
2084
2075
                    %% Either we cannot read the table yet
2085
2076
                    %% or someone is moving a replica between
2086
2077
                    %% two nodes
2087
 
                    Cs =  Load#net_load.cstruct,
2088
2078
                    Res = mnesia_loader:net_load_table(Tab, Reason, Active, Cs),
2089
2079
                    case Res of
2090
2080
                        {loaded, ok} ->
2096
2086
                    end
2097
2087
            end
2098
2088
    end;
2099
 
load_table_fun(Load) when record(Load, disc_load) ->
2100
 
    Tab = Load#disc_load.table,
2101
 
    Reason =  Load#disc_load.reason,
2102
 
    ReplyTo = Load#disc_load.opt_reply_to,
 
2089
load_table_fun(#disc_load{table=Tab, reason=Reason, opt_reply_to=ReplyTo}) ->
2103
2090
    ReadNode = val({Tab, where_to_read}),
2104
2091
    Active = filter_active(Tab),
2105
2092
    Done = #loader_done{is_loaded = true,