~ubuntu-branches/debian/squeeze/erlang/squeeze

« back to all changes in this revision

Viewing changes to lib/mnesia/src/mnesia_controller.erl

  • Committer: Bazaar Package Importer
  • Author(s): Erlang Packagers, Sergei Golovan
  • Date: 2006-12-03 17:07:44 UTC
  • mfrom: (2.1.11 feisty)
  • Revision ID: james.westby@ubuntu.com-20061203170744-rghjwupacqlzs6kv
Tags: 1:11.b.2-4
[ Sergei Golovan ]
Fixed erlang-base and erlang-base-hipe prerm scripts.

Show diffs side-by-side

added added

removed removed

Lines of Context:
66
66
         add_active_replica/2,
67
67
         add_active_replica/3,
68
68
         add_active_replica/4,
 
69
         update/1,
69
70
         change_table_access_mode/1,
70
71
         del_active_replica/2,
71
72
         wait_for_tables/2,
103
104
-record(state, {supervisor,
104
105
                schema_is_merged = false,
105
106
                early_msgs = [],
106
 
                loader_pid,
107
 
                loader_queue = [],
108
 
                sender_pid = [],     %% Was a pid or undef is now a list pids.
 
107
                loader_pid = [],     %% Was Pid is now [{Pid,Work}|..] 
 
108
                loader_queue,        %% Was list is now gb_tree
 
109
                sender_pid = [],     %% Was a pid or undef is now [{Pid,Work}|..] 
109
110
                sender_queue =  [],
110
 
                late_loader_queue = [],
111
 
                dumper_pid,          % Dumper or schema commit pid
112
 
                dumper_queue = [],   % Dumper or schema commit queue
113
 
                others = [],         % Processes that needs the copier_done msg
 
111
                late_loader_queue,   %% Was list is now gb_tree
 
112
                dumper_pid,          %% Dumper or schema commit pid
 
113
                dumper_queue = [],   %% Dumper or schema commit queue
 
114
                others = [],         %% Processes that needs the copier_done msg
114
115
                dump_log_timer_ref,
115
116
                is_stopping = false
116
117
               }).
117
118
%% Backwards Comp. Sender_pid is now a list of senders..
118
 
get_senders(#state{sender_pid = undefined}) -> [];
119
 
get_senders(#state{sender_pid = Pid}) when pid(Pid) -> [Pid];
120
119
get_senders(#state{sender_pid = Pids}) when list(Pids) -> Pids.    
121
 
 
122
 
-record(worker_reply, {what,
123
 
                       pid,
124
 
                       result
125
 
                      }).
 
120
%% Backwards Comp. loader_pid is now a list of loaders..
 
121
get_loaders(#state{loader_pid = Pids}) when list(Pids) -> Pids.    
 
122
max_loaders() ->
 
123
    case ?catch_val(no_table_loaders) of
 
124
        {'EXIT', _} -> 
 
125
            mnesia_lib:set(no_table_loaders,1),
 
126
            1;
 
127
        Val -> Val
 
128
    end.
126
129
 
127
130
-record(schema_commit_lock, {owner}).
128
131
-record(block_controller, {owner}).
209
212
    receive
210
213
        {?SERVER_NAME, Pid, Res} ->
211
214
            Res;
212
 
 
213
215
        {'EXIT', Pid, _} ->
214
216
            reply_wait(Tabs)
215
 
 
216
217
    after Timeout ->
217
218
            unlink(Pid),
218
219
            exit(Pid, timeout),
273
274
get_cstructs() ->
274
275
    call(get_cstructs).
275
276
 
 
277
update(Fun) ->
 
278
    call({update,Fun}).
 
279
 
 
280
 
276
281
mnesia_down(Node) ->
277
282
    case cast({mnesia_down, Node}) of
278
283
        {error, _} -> mnesia_monitor:mnesia_down(?SERVER_NAME, Node);
302
307
    call({add_other, self()}),
303
308
    Reason = {dumper,add_table_copy},
304
309
    Work = #net_load{table = Tab,reason = Reason,cstruct = Cs},
305
 
    Res = (catch load_table(Work)),
 
310
    %% I'll need this cause it's linked trough the subscriber
 
311
    %% might be solved by using monitor in subscr instead.
 
312
    process_flag(trap_exit, true),
 
313
    Load = load_table_fun(Work),
 
314
    Res = (catch Load()),
 
315
    process_flag(trap_exit, false),
306
316
    call({del_other, self()}),
307
317
    case Res of
308
318
        #loader_done{is_loaded = true} ->
441
451
    Res = try_merge_schema(New),
442
452
    Msg = {schema_is_merged, [], late_merge, []},
443
453
    multicall([node()|Ns], Msg),
444
 
    Father ! {?MODULE, self(), Res, New},
 
454
    After = val({current, db_nodes}),    
 
455
    Father ! {?MODULE, self(), Res, mnesia_lib:intersect(Ns,After--Current)},
445
456
    unlink(Father),
446
457
    ok.
447
458
    
577
588
    {ok, Ref} = timer:send_interval(Interval, Msg),
578
589
    mnesia_dumper:start_regulator(),
579
590
    
580
 
    {ok, #state{supervisor = Parent, dump_log_timer_ref = Ref}}.
 
591
    Empty = gb_trees:empty(),
 
592
    {ok, #state{supervisor = Parent, dump_log_timer_ref = Ref, 
 
593
                loader_queue = Empty,
 
594
                late_loader_queue = Empty}}.
581
595
 
582
596
%%----------------------------------------------------------------------
583
597
%% Func: handle_call/3
606
620
    State2 = add_worker(Worker, State),
607
621
    noreply(State2);
608
622
 
 
623
handle_call({update,Fun}, From, State) ->
 
624
    Res = (catch Fun()),
 
625
    reply(From, Res), 
 
626
    noreply(State);
 
627
 
609
628
handle_call(get_cstructs, From, State) ->
610
629
    Tabs = val({schema, tables}),
611
630
    Cstructs = [val({T, cstruct}) || T <- Tabs],
635
654
    State3 = State2#state{early_msgs = [], schema_is_merged = true},
636
655
    handle_early_msgs(lists:reverse(Msgs), State3);
637
656
 
638
 
handle_call(disc_load_intents, From, State) ->
639
 
    Tabs = disc_load_intents(State#state.loader_queue) ++
640
 
           disc_load_intents(State#state.late_loader_queue),
641
 
    ActiveTabs = mnesia_lib:local_active_tables(),
642
 
    reply(From, {ok, node(), mnesia_lib:union(Tabs, ActiveTabs)}),
 
657
handle_call(disc_load_intents,From,State = #state{loader_queue=LQ,late_loader_queue=LLQ}) ->
 
658
    LQTabs  = gb_trees:keys(LQ),
 
659
    LLQTabs = gb_trees:keys(LLQ),
 
660
    ActiveTabs = lists:sort(mnesia_lib:local_active_tables()),
 
661
    reply(From, {ok, node(), ordsets:union([LQTabs,LLQTabs,ActiveTabs])}),
643
662
    noreply(State);
644
663
 
645
664
handle_call({update_where_to_write, [add, Tab, AddNode], _From}, _Dummy, State) ->
648
667
        case lists:member(AddNode, Current) and 
649
668
            (State#state.schema_is_merged == true) of
650
669
            true ->
651
 
                mnesia_lib:add({Tab, where_to_write}, AddNode);
 
670
                mnesia_lib:add_lsort({Tab, where_to_write}, AddNode);
652
671
            false ->
653
672
                ignore
654
673
        end,
663
682
            reply(ReplyTo, ignore),
664
683
            noreply(State);
665
684
        Merged == true ->
666
 
            Res = add_active_replica(Tab, ToNode, RemoteS, AccessMode),
 
685
            Res = case ?catch_val({Tab, cstruct}) of
 
686
                      {'EXIT', _} ->  %% Tab deleted
 
687
                          deleted;
 
688
                      _ ->
 
689
                          add_active_replica(Tab, ToNode, RemoteS, AccessMode)
 
690
                  end,
667
691
            reply(ReplyTo, Res),
668
692
            noreply(State);
669
693
        true -> %% Schema is not merged
750
774
    error("~p got unexpected call: ~p~n", [?SERVER_NAME, Msg]),
751
775
    noreply(State).
752
776
 
753
 
disc_load_intents([H | T]) when record(H, disc_load) ->
754
 
    [H#disc_load.table | disc_load_intents(T)];
755
 
disc_load_intents([H | T]) when record(H, late_load) ->
756
 
    [H#late_load.table | disc_load_intents(T)];
757
 
disc_load_intents( [H | T]) when record(H, net_load) ->
758
 
    disc_load_intents(T);
759
 
disc_load_intents([]) ->
760
 
    [].
761
 
 
762
 
late_disc_load(TabsR, Reason, RemoteLoaders, From, State) ->
 
777
late_disc_load(TabsR, Reason, RemoteLoaders, From, 
 
778
               State = #state{loader_queue = LQ, late_loader_queue = LLQ}) ->
763
779
    verbose("Intend to load tables: ~p~n", [TabsR]),
764
780
    ?eval_debug_fun({?MODULE, late_disc_load},
765
781
                    [{tabs, TabsR}, 
769
785
    reply(From, queued),
770
786
    %% RemoteLoaders is a list of {ok, Node, Tabs} tuples
771
787
 
772
 
    %% Remove deleted tabs
773
 
    LocalTabs = mnesia_lib:val({schema, local_tables}),
774
 
    Filter = fun({Tab, Reas}, Acc) -> 
775
 
                     case lists:member(Tab, LocalTabs) of
776
 
                         true -> [{Tab, Reas} | Acc];
777
 
                         false -> Acc
778
 
                     end;
779
 
                (Tab, Acc) ->
780
 
                     case lists:member(Tab, LocalTabs) of
781
 
                         true -> [Tab | Acc];
782
 
                         false -> Acc
 
788
    %% Remove deleted tabs and queued/loaded
 
789
    LocalTabs = gb_sets:from_ordset(lists:sort(mnesia_lib:val({schema,local_tables}))),
 
790
    Filter = fun(TabInfo0, Acc) -> 
 
791
                     TabInfo = {Tab,_} = 
 
792
                         case TabInfo0 of 
 
793
                             {_,_} -> TabInfo0;
 
794
                             TabN -> {TabN,Reason}
 
795
                         end,
 
796
                     case gb_sets:is_member(Tab, LocalTabs) of
 
797
                         true -> 
 
798
                             case ?catch_val({Tab, where_to_read}) == node() of
 
799
                                 true -> Acc;
 
800
                                 false ->
 
801
                                     case gb_trees:is_defined(Tab,LQ) of
 
802
                                         true ->  Acc;
 
803
                                         false -> [TabInfo | Acc]
 
804
                                     end
 
805
                             end;
 
806
                         false -> Acc
783
807
                     end
784
808
             end,
785
809
    
786
810
    Tabs = lists:foldl(Filter, [], TabsR),
787
811
    
788
812
    Nodes = val({current, db_nodes}),
789
 
    LateLoaders = late_loaders(Tabs, Reason, RemoteLoaders, Nodes),
790
 
    LateQueue = State#state.late_loader_queue ++ LateLoaders,
791
 
    State#state{late_loader_queue = LateQueue}.
792
 
 
793
 
late_loaders([{Tab, Reason} | Tabs], DefaultReason, RemoteLoaders, Nodes) ->
794
 
    LoadNodes = late_load_filter(RemoteLoaders, Tab, Nodes, []),
795
 
    case LoadNodes of
796
 
        [] ->
797
 
            cast({disc_load, Tab, Reason}); % Ugly cast
798
 
        _ ->
799
 
            ignore
800
 
    end,
801
 
    LateLoad = #late_load{table = Tab, loaders = LoadNodes, reason = Reason},
802
 
    [LateLoad | late_loaders(Tabs, DefaultReason, RemoteLoaders, Nodes)];
803
 
 
804
 
late_loaders([Tab | Tabs], Reason, RemoteLoaders, Nodes) ->
805
 
    Loaders = late_load_filter(RemoteLoaders, Tab, Nodes, []),
806
 
    case Loaders of
807
 
        [] ->
808
 
            cast({disc_load, Tab, Reason});  % Ugly cast
809
 
        _ ->
810
 
            ignore
811
 
    end,
812
 
    LateLoad = #late_load{table = Tab, loaders = Loaders, reason = Reason},
813
 
    [LateLoad | late_loaders(Tabs, Reason, RemoteLoaders, Nodes)];
814
 
late_loaders([], _Reason, _RemoteLoaders, _Nodes) ->
815
 
    [].
 
813
    LateQueue = late_loaders(Tabs, RemoteLoaders, Nodes, LLQ),
 
814
    State#state{late_loader_queue = LateQueue}. 
 
815
 
 
816
late_loaders([{Tab, Reason} | Tabs], RemoteLoaders, Nodes, LLQ) ->
 
817
    case gb_trees:is_defined(Tab, LLQ) of
 
818
        false ->
 
819
            LoadNodes = late_load_filter(RemoteLoaders, Tab, Nodes, []),
 
820
            case LoadNodes of
 
821
                [] ->  cast({disc_load, Tab, Reason}); % Ugly cast
 
822
                _ ->   ignore
 
823
            end,
 
824
            LateLoad = #late_load{table=Tab,loaders=LoadNodes,reason=Reason},
 
825
            late_loaders(Tabs, RemoteLoaders, Nodes, gb_trees:insert(Tab,LateLoad,LLQ));
 
826
        true ->
 
827
            late_loaders(Tabs, RemoteLoaders, Nodes, LLQ)            
 
828
    end;
 
829
late_loaders([], _RemoteLoaders, _Nodes, LLQ) ->
 
830
    LLQ.
816
831
 
817
832
late_load_filter([{error, _} | RemoteLoaders], Tab, Nodes, Acc) ->
818
833
    late_load_filter(RemoteLoaders, Tab, Nodes, Acc);
906
921
    %% Fix internal stuff
907
922
    LateQ = remove_loaders(Alltabs, Node, State#state.late_loader_queue),
908
923
    
909
 
    case State#state.loader_pid of
910
 
        undefined -> ignore;
911
 
        Pid2 when pid(Pid2) -> Pid2 ! {copier_done, Node}
912
 
    end,
913
 
    case get_senders(State) of
 
924
    case get_senders(State) ++ get_loaders(State) of
914
925
        [] -> ignore;
915
 
        Pids -> 
 
926
        Senders -> 
916
927
            lists:foreach(fun({Pid,_}) -> Pid ! {copier_done, Node} end,
917
 
                          Pids)
 
928
                          Senders)
918
929
    end,
919
930
    lists:foreach(fun(Pid) -> Pid ! {copier_done,Node} end, 
920
931
                  State#state.others),
929
940
                        late_loader_queue = LateQ
930
941
                       });
931
942
 
932
 
handle_cast({im_running, _Node, NewFriends}, State) ->
933
 
    Tabs = mnesia_lib:local_active_tables() -- [schema],
934
 
    Ns = mnesia_lib:intersect(NewFriends, val({current, db_nodes})),
935
 
    abcast(Ns, {adopt_orphans, node(), Tabs}),
936
 
    noreply(State);
937
 
 
938
943
handle_cast({merging_schema, Node}, State) ->
939
944
    case State#state.schema_is_merged of
940
945
        false ->
958
963
    Msgs = State#state.early_msgs,
959
964
    noreply(State#state{early_msgs = [{cast, Msg} | Msgs]});
960
965
 
 
966
%% This must be done after schema_is_merged otherwise adopt_orphan
 
967
%% might trigger a table load from wrong nodes as a result of that we don't 
 
968
%% know which tables we can load safly first.
 
969
handle_cast({im_running, _Node, NewFriends}, State) ->
 
970
    Tabs = mnesia_lib:local_active_tables() -- [schema],
 
971
    Ns = mnesia_lib:intersect(NewFriends, val({current, db_nodes})),
 
972
    abcast(Ns, {adopt_orphans, node(), Tabs}),
 
973
    noreply(State);
 
974
 
961
975
handle_cast({disc_load, Tab, Reason}, State) ->
962
976
    Worker = #disc_load{table = Tab, reason = Reason},
963
977
    State2 = add_worker(Worker, State),
1020
1034
    
1021
1035
    %% Register the other node as up and running
1022
1036
    mnesia_recover:log_mnesia_up(Node),
1023
 
    verbose("Logging mnesia_up ~w~n", [Node]),
 
1037
    verbose("Logging mnesia_up ~w~n",[Node]),
1024
1038
    mnesia_lib:report_system_event({mnesia_up, Node}),
1025
1039
    
1026
1040
    %% Load orphan tables
1037
1051
                    [Tab || {Tab, Ns} <- RemoteMasters,
1038
1052
                            lists:member(N, Ns)],
1039
1053
                mnesia_late_loader:maybe_async_late_disc_load(N, RemoteOrphans, Reason)
1040
 
          end,
 
1054
        end,
1041
1055
    lists:foreach(Fun, Nodes),
1042
 
    
1043
 
    Queue = State2#state.loader_queue,
1044
 
    State3 = State2#state{loader_queue = Queue},
1045
 
    noreply(State3);
 
1056
    noreply(State2);
1046
1057
 
1047
1058
handle_cast(Msg, State) ->
1048
1059
    error("~p got unexpected cast: ~p~n", [?SERVER_NAME, Msg]),
1049
1060
    noreply(State).
1050
1061
 
1051
 
handle_sync_tabs([Tab | Tabs], From) ->
 
1062
handle_sync_tabs([Tab | Tabs], From) ->    
1052
1063
    case val({Tab, where_to_read}) of
1053
1064
        nowhere ->
1054
1065
            case get({sync_tab, Tab}) of
1094
1105
            {stop, fatal, State}
1095
1106
    end;
1096
1107
 
1097
 
handle_info(Done, State) when record(Done, loader_done) ->
1098
 
    if
1099
 
        %% Assertion
1100
 
        Done#loader_done.worker_pid == State#state.loader_pid -> ok
1101
 
    end,
1102
 
            
1103
 
    [_Worker | Rest] = State#state.loader_queue,
1104
 
    LateQueue0 = State#state.late_loader_queue,
1105
 
    {LoadQ, LateQueue} =
 
1108
handle_info(Done, State0) when record(Done, loader_done) ->    
 
1109
    WPid = Done#loader_done.worker_pid,
 
1110
    LateQueue0 = State0#state.late_loader_queue,
 
1111
    Tab = Done#loader_done.table_name,
 
1112
    State1 = State0#state{loader_pid = lists:keydelete(WPid,1,get_loaders(State0))},
 
1113
 
 
1114
    State2 =
1106
1115
        case Done#loader_done.is_loaded of
1107
1116
            true ->
1108
 
                Tab = Done#loader_done.table_name,
1109
 
                
1110
 
                %% Optional user sync
1111
 
                case Done#loader_done.needs_sync of
1112
 
                    true -> user_sync_tab(Tab);
1113
 
                    false -> ignore
1114
 
                end,
1115
 
                
1116
1117
                %% Optional table announcement
1117
1118
                if 
1118
1119
                    Done#loader_done.needs_announce == true,
1139
1140
                        Ns = val({current, db_nodes}),
1140
1141
                        AlreadyKnows = val({Tab, active_replicas}),
1141
1142
                        abcast(Ns -- AlreadyKnows, {i_have_tab, Tab, node()})
1142
 
                end,            
1143
 
                {Rest, reply_late_load(Tab, LateQueue0)};
 
1143
                end,
 
1144
                %% Optional user sync
 
1145
                case Done#loader_done.needs_sync of
 
1146
                    true -> user_sync_tab(Tab);
 
1147
                    false -> ignore
 
1148
                end,
 
1149
                State1#state{late_loader_queue=gb_trees:delete_any(Tab, LateQueue0)};
1144
1150
            false ->
 
1151
                %% Either the node went down or table was not
 
1152
                %% loaded remotly yet 
1145
1153
                case Done#loader_done.needs_reply of
1146
1154
                    true ->
1147
1155
                        reply(Done#loader_done.reply_to,
1149
1157
                    false ->
1150
1158
                        ignore
1151
1159
                end,
1152
 
                {Rest, LateQueue0}
 
1160
                case ?catch_val({Tab, active_replicas}) of
 
1161
                    [_|_] -> % still available elsewhere
 
1162
                        {value,{_,Worker}} = lists:keysearch(WPid,1,get_loaders(State0)),
 
1163
                        add_loader(Tab,Worker,State1);
 
1164
                    _ ->
 
1165
                        State1
 
1166
                end
1153
1167
        end,
1154
 
 
1155
 
    State2 = State#state{loader_pid = undefined,
1156
 
                         loader_queue = LoadQ,
1157
 
                         late_loader_queue = LateQueue},
1158
 
 
1159
1168
    State3 = opt_start_worker(State2),
1160
1169
    noreply(State3);
1161
1170
 
1188
1197
 
1189
1198
handle_info({'EXIT', Pid, R}, State) when Pid == State#state.dumper_pid ->
1190
1199
    case State#state.dumper_queue of
1191
 
        [#schema_commit_lock{}|Workers] ->  %% Schema trans crashed or was killed
 
1200
        [#schema_commit_lock{}|Workers] -> %% Schema trans crashed or was killed
 
1201
            dbg_out("WARNING: Dumper ~p exited ~p~n", [Pid, R]),
1192
1202
            State2 = State#state{dumper_queue = Workers, dumper_pid = undefined},
1193
1203
            State3 = opt_start_worker(State2),
1194
1204
            noreply(State3);
1197
1207
            {stop, fatal, State}
1198
1208
    end;
1199
1209
 
1200
 
handle_info({'EXIT', Pid, R}, State) when Pid == State#state.loader_pid ->
1201
 
    fatal("Loader crashed: ~p~n state: ~p~n", [R, State]),
1202
 
    {stop, fatal, State};
1203
 
 
1204
1210
handle_info(Msg = {'EXIT', Pid, R}, State) when R /= wait_for_tables_timeout ->
1205
1211
    case lists:keymember(Pid, 1, get_senders(State)) of
1206
1212
        true ->
1209
1215
            fatal("Sender crashed: ~p~n state: ~p~n", [{Pid,R}, State]),
1210
1216
            {stop, fatal, State};
1211
1217
        false ->
1212
 
            error("~p got unexpected info: ~p~n", [?SERVER_NAME, Msg]),
1213
 
            noreply(State)
 
1218
            case lists:keymember(Pid, 1, get_loaders(State)) of
 
1219
                true -> 
 
1220
                    fatal("Loader crashed: ~p~n state: ~p~n", [R, State]),
 
1221
                    {stop, fatal, State};
 
1222
                false ->
 
1223
                    error("~p got unexpected info: ~p~n", [?SERVER_NAME, Msg]),
 
1224
                    noreply(State)
 
1225
            end
1214
1226
    end;
1215
1227
 
1216
1228
handle_info({From, get_state}, State) ->
1231
1243
    error("~p got unexpected info: ~p~n", [?SERVER_NAME, Msg]),
1232
1244
    noreply(State).
1233
1245
 
1234
 
reply_late_load(Tab, [H | T]) when H#late_load.table == Tab ->
1235
 
    reply(H#late_load.opt_reply_to, ok),
1236
 
    reply_late_load(Tab, T);
1237
 
reply_late_load(Tab, [H | T])  ->
1238
 
    [H | reply_late_load(Tab, T)];
1239
 
reply_late_load(_Tab, []) ->
1240
 
    [].
1241
 
 
1242
1246
sync_tab_timeout(Pid, [{{sync_tab, Tab}, Pids} | Tail]) ->
1243
1247
    case lists:delete(Pid, Pids) of
1244
1248
        [] ->
1255
1259
%% Pick the load record that has the highest load order
1256
1260
%% Returns {BestLoad, RemainingQueue} or {none, []} if queue is empty
1257
1261
pick_next(Queue) ->
1258
 
    pick_next(Queue, none, none, []).
 
1262
    List = gb_trees:values(Queue),
 
1263
    case pick_next(List, none, none) of
 
1264
        none -> {none, gb_trees:empty()};
 
1265
        {Tab, Worker} -> {Worker, gb_trees:delete(Tab,Queue)}
 
1266
    end.
1259
1267
 
1260
 
pick_next([Head | Tail], Load, Order, Rest) when record(Head, net_load) ->
 
1268
pick_next([Head | Tail], Load, Order) when record(Head, net_load) ->
1261
1269
    Tab = Head#net_load.table,
1262
 
    select_best(Head, Tail, val({Tab, load_order}), Load, Order, Rest);
1263
 
pick_next([Head | Tail], Load, Order, Rest) when record(Head, disc_load) ->
 
1270
    select_best(Head, Tail, ?catch_val({Tab, load_order}), Load, Order);
 
1271
pick_next([Head | Tail], Load, Order) when record(Head, disc_load) ->
1264
1272
    Tab = Head#disc_load.table,
1265
 
    select_best(Head, Tail, val({Tab, load_order}), Load, Order, Rest);
1266
 
pick_next([], Load, _Order, Rest) ->
1267
 
    {Load, Rest}.
 
1273
    select_best(Head, Tail, ?catch_val({Tab, load_order}), Load, Order);
 
1274
pick_next([], none, _Order) ->
 
1275
    none;
 
1276
pick_next([], Load, _Order) ->
 
1277
    {element(2,Load), Load}.
1268
1278
 
1269
 
select_best(Load, Tail, Order, none, none, Rest) ->
1270
 
    pick_next(Tail, Load, Order, Rest);
1271
 
select_best(Load, Tail, Order, OldLoad, OldOrder, Rest) when Order > OldOrder ->
1272
 
    pick_next(Tail, Load, Order, [OldLoad | Rest]);
1273
 
select_best(Load, Tail, _Order, OldLoad, OldOrder, Rest) ->
1274
 
    pick_next(Tail, OldLoad, OldOrder, [Load | Rest]).
 
1279
select_best(_Head, Tail, {'EXIT', _WHAT}, Load, Order) ->
 
1280
    %% Table have been deleted drop it.
 
1281
    pick_next(Tail, Load, Order);
 
1282
select_best(Load, Tail, Order, none, none) ->
 
1283
    pick_next(Tail, Load, Order);
 
1284
select_best(Load, Tail, Order, _OldLoad, OldOrder) when Order > OldOrder ->
 
1285
    pick_next(Tail, Load, Order);
 
1286
select_best(_Load, Tail, _Order, OldLoad, OldOrder) ->
 
1287
    pick_next(Tail, OldLoad, OldOrder).
1275
1288
 
1276
1289
%%----------------------------------------------------------------------
1277
1290
%% Func: terminate/2
1286
1299
%% Purpose: Upgrade process when its code is to be changed
1287
1300
%% Returns: {ok, NewState}
1288
1301
%%----------------------------------------------------------------------
1289
 
code_change(_OldVsn, State, _Extra) ->
 
1302
code_change(_OldVsn, State0, _Extra) ->
 
1303
    %% Loader Queue
 
1304
    State1 = case State0#state.loader_pid of
 
1305
                 Pids when is_list(Pids) -> State0;
 
1306
                 undefined -> State0#state{loader_pid = [],loader_queue=gb_trees:empty()};
 
1307
                 Pid when is_pid(Pid) -> 
 
1308
                     [Loader|Rest] = State0#state.loader_queue,
 
1309
                     LQ0 = [{element(2,Rec),Rec} || Rec <- Rest],
 
1310
                     LQ1 = lists:sort(LQ0),
 
1311
                     LQ  = gb_trees:from_orddict(LQ1),
 
1312
                     State0#state{loader_pid=[{Pid,Loader}], loader_queue=LQ}
 
1313
             end,
 
1314
    %% LateLoaderQueue
 
1315
    State = if is_list(State1#state.late_loader_queue) -> 
 
1316
                    LLQ0 = State1#state.late_loader_queue,
 
1317
                    LLQ1 = lists:sort([{element(2,Rec),Rec} || Rec <- LLQ0]),
 
1318
                    LLQ  = gb_trees:from_orddict(LLQ1),
 
1319
                    State1#state{late_loader_queue=LLQ};
 
1320
               true ->
 
1321
                    State1
 
1322
            end,
1290
1323
    {ok, State}.
1291
 
 
 
1324
 
1292
1325
%%%----------------------------------------------------------------------
1293
1326
%%% Internal functions
1294
1327
%%%----------------------------------------------------------------------
1337
1370
    LocalContent = Cs#cstruct.local_content,
1338
1371
    RamCopyHoldersOnDiscNodes = mnesia_lib:intersect(RamCopyHolders, DiscNodes),
1339
1372
    Active = val({Tab, active_replicas}),
 
1373
    BeingCreated = (?catch_val({Tab, create_table}) == true),
 
1374
    Read = val({Tab, where_to_read}),
1340
1375
    case lists:member(Node, DiscCopyHolders) of
 
1376
        _ when BeingCreated == true -> 
 
1377
            orphan_tables(Tabs, Node, Ns, Local, Remote);
 
1378
        _ when Read == node() -> %% Allready loaded
 
1379
            orphan_tables(Tabs, Node, Ns, Local, Remote);
1341
1380
        true when Active == [] ->
1342
1381
            case DiscCopyHolders -- Ns of
1343
1382
                [] ->
1372
1411
    {LocalOrphans, RemoteMasters}.
1373
1412
 
1374
1413
node_has_tabs([Tab | Tabs], Node, State) when Node /= node() ->
1375
 
    State2 = update_whereabouts(Tab, Node, State),
 
1414
    State2 = 
 
1415
        case catch update_whereabouts(Tab, Node, State) of
 
1416
            State1 = #state{} -> State1;
 
1417
            {'EXIT', R} ->  %% Tab was just deleted?
 
1418
                case ?catch_val({Tab, cstruct}) of
 
1419
                    {'EXIT', _} -> State; % yes
 
1420
                    _ ->  erlang:fault(R) 
 
1421
                end
 
1422
        end,
1376
1423
    node_has_tabs(Tabs, Node, State2);
1377
1424
node_has_tabs([Tab | Tabs], Node, State) ->
1378
1425
    user_sync_tab(Tab),
1403
1450
        LocalC == true ->
1404
1451
            %% Local contents, don't care about other node
1405
1452
            State;
 
1453
        BeingCreated == true ->     
 
1454
            %% The table is currently being created
 
1455
            %% It will be handled elsewhere
 
1456
            State;
1406
1457
        Storage == unknown, Read == nowhere ->
1407
1458
            %% No own copy, time to read remotely
1408
1459
            %% if the other node is a good node
1430
1481
            end,
1431
1482
            user_sync_tab(Tab),
1432
1483
            State;
1433
 
        BeingCreated == true ->
1434
 
            %% The table is currently being created
1435
 
            %% and we shall have an own copy of it.
1436
 
            %% We will load the (empty) table locally.
1437
 
            add_active_replica(Tab, Node),
1438
 
            State;
1439
1484
        Read == nowhere ->
1440
1485
            %% Own copy, go and get a copy of the table
1441
1486
            %% if the other node is master or if there
1561
1606
    [M|remove_early_messages(R,Node)].
1562
1607
 
1563
1608
%% Drop loader from late load queue and possibly trigger a disc_load
1564
 
drop_loaders(Tab, Node, [H | T]) when H#late_load.table == Tab ->
1565
 
    %% Check if it is time to issue a disc_load request
1566
 
    case H#late_load.loaders of
1567
 
        [Node] ->
1568
 
            Reason = {H#late_load.reason, last_loader_down, Node},
1569
 
            cast({disc_load, Tab, Reason});  % Ugly cast
1570
 
        _ ->
1571
 
            ignore
1572
 
    end,
1573
 
    %% Drop the node from the list of loaders
1574
 
    H2 = H#late_load{loaders = H#late_load.loaders -- [Node]},
1575
 
    [H2 | drop_loaders(Tab, Node, T)];
1576
 
drop_loaders(Tab, Node, [H | T]) ->
1577
 
    [H | drop_loaders(Tab, Node, T)];
1578
 
drop_loaders(_, _, []) ->
1579
 
    [].
 
1609
drop_loaders(Tab, Node, LLQ) ->
 
1610
    case gb_trees:lookup(Tab,LLQ) of
 
1611
        none ->
 
1612
            LLQ;
 
1613
        {value, H} ->
 
1614
            %% Check if it is time to issue a disc_load request
 
1615
            case H#late_load.loaders of
 
1616
                [Node] ->
 
1617
                    Reason = {H#late_load.reason, last_loader_down, Node},
 
1618
                    cast({disc_load, Tab, Reason});  % Ugly cast
 
1619
                _ ->
 
1620
                    ignore
 
1621
            end,
 
1622
            %% Drop the node from the list of loaders
 
1623
            H2 = H#late_load{loaders = H#late_load.loaders -- [Node]},
 
1624
            gb_trees:update(Tab, H2, LLQ)
 
1625
    end.
1580
1626
 
1581
1627
add_active_replica(Tab, Node) ->
1582
1628
    add_active_replica(Tab, Node, val({Tab, cstruct})).
1617
1663
        read_write ->
1618
1664
            New = lists:sort([{Node, Storage} | Del]),
1619
1665
            set(Var, mark_blocked_tab(Blocked, New)), % where_to_commit
1620
 
            add({Tab, where_to_write}, Node);
 
1666
            mnesia_lib:add_lsort({Tab, where_to_write}, Node);
1621
1667
        read_only ->
1622
1668
            set(Var, mark_blocked_tab(Blocked, Del)),
1623
1669
            mnesia_lib:del({Tab, where_to_write}, Node)
1634
1680
    mnesia_lib:del({Tab, where_to_write}, Node).
1635
1681
 
1636
1682
change_table_access_mode(Cs) ->
1637
 
    Tab = Cs#cstruct.name,
1638
 
    lists:foreach(fun(N) -> add_active_replica(Tab, N, Cs) end,
1639
 
                  val({Tab, active_replicas})).
 
1683
    W = fun() -> 
 
1684
                Tab = Cs#cstruct.name,
 
1685
                lists:foreach(fun(N) -> add_active_replica(Tab, N, Cs) end,
 
1686
                              val({Tab, active_replicas}))
 
1687
        end,
 
1688
    update(W).
 
1689
            
1640
1690
 
1641
1691
%% node To now has tab loaded, but this must be undone
1642
1692
%% This code is rpc:call'ed from the tab_copier process
1704
1754
        Pid ->
1705
1755
            Pid ! {self(), get_state},
1706
1756
            receive
1707
 
                {?SERVER_NAME, State} when record(State, state) ->
1708
 
                    {info,State}
 
1757
                {?SERVER_NAME, State = #state{loader_queue=LQ,late_loader_queue=LLQ}} ->
 
1758
                    {info,State#state{loader_queue=gb_trees:to_list(LQ),
 
1759
                                      late_loader_queue=gb_trees:to_list(LLQ)}}
1709
1760
            after Timeout ->
1710
1761
                    {timeout, Timeout}
1711
1762
            end
1719
1770
            Pid ! {self(), get_state},
1720
1771
            receive
1721
1772
                {?SERVER_NAME, State} when record(State, state) ->
1722
 
                    {workers, State#state.loader_pid, get_senders(State), State#state.dumper_pid}
 
1773
                    {workers, get_loaders(State), get_senders(State), State#state.dumper_pid}
1723
1774
            after Timeout ->
1724
1775
                    {timeout, Timeout}
1725
1776
            end
1816
1867
    State2 = State#state{dumper_queue = Queue2},
1817
1868
    opt_start_worker(State2);
1818
1869
add_worker(Worker, State) when record(Worker, net_load) ->
1819
 
    Queue = State#state.loader_queue,
1820
 
    State2 = State#state{loader_queue = Queue ++ [Worker]},
1821
 
    opt_start_worker(State2);
 
1870
    opt_start_worker(add_loader(Worker#net_load.table,Worker,State));
1822
1871
add_worker(Worker, State) when record(Worker, send_table) ->
1823
1872
    Queue = State#state.sender_queue,
1824
1873
    State2 = State#state{sender_queue = Queue ++ [Worker]},
1825
1874
    opt_start_worker(State2);
1826
1875
add_worker(Worker, State) when record(Worker, disc_load) ->
1827
 
    Queue = State#state.loader_queue,
1828
 
    State2 = State#state{loader_queue = Queue ++ [Worker]},
1829
 
    opt_start_worker(State2);
 
1876
    opt_start_worker(add_loader(Worker#disc_load.table,Worker,State));
1830
1877
% Block controller should be used for upgrading mnesia.
1831
1878
add_worker(Worker, State) when record(Worker, block_controller) -> 
1832
1879
    Queue = State#state.dumper_queue,
1834
1881
    State2 = State#state{dumper_queue = Queue2},
1835
1882
    opt_start_worker(State2).
1836
1883
 
 
1884
add_loader(Tab,Worker,State = #state{loader_queue=LQ0}) ->
 
1885
    case gb_trees:is_defined(Tab, LQ0) of
 
1886
        true -> State;
 
1887
        false -> 
 
1888
            LQ=gb_trees:insert(Tab, Worker, LQ0),
 
1889
            State#state{loader_queue=LQ}
 
1890
    end.
 
1891
 
1837
1892
%% Optionally start a worker
1838
1893
%% 
1839
1894
%% Dumpers and loaders may run simultaneously
1869
1924
                    opt_start_loader(State3);
1870
1925
                
1871
1926
                record(Worker, block_controller) ->
1872
 
                    case {get_senders(State), State#state.loader_pid} of
1873
 
                        {[], undefined} ->
 
1927
                    case {get_senders(State), get_loaders(State)} of
 
1928
                        {[], []} ->
1874
1929
                            ReplyTo = Worker#block_controller.owner,
1875
1930
                            reply(ReplyTo, granted),
1876
1931
                            {Owner, _Tag} = ReplyTo,
1887
1942
 
1888
1943
opt_start_sender(State) ->
1889
1944
    case State#state.sender_queue of
1890
 
        []->
1891
 
            %% No need
1892
 
            State;
 
1945
        []->   State;       %% No need
1893
1946
        SenderQ -> 
1894
1947
            {NewS,Kept} = opt_start_sender2(SenderQ, get_senders(State), 
1895
 
                                            [], State#state.loader_queue),
 
1948
                                            [], get_loaders(State)),
1896
1949
            State#state{sender_pid = NewS, sender_queue = Kept}
1897
1950
    end.
1898
1951
 
1900
1953
opt_start_sender2([Sender|R], Pids, Kept, LoaderQ) ->
1901
1954
    Tab = Sender#send_table.table,
1902
1955
    Active = val({Tab, active_replicas}),
1903
 
    IgotIt = lists:member(node(), Active),
 
1956
    IgotIt = lists:member(node(), Active),    
 
1957
    IsLoading = lists:any(fun({_Pid,Loader}) -> 
 
1958
                                  Tab == element(#net_load.table, Loader)
 
1959
                          end, LoaderQ),
1904
1960
    if 
1905
 
        (hd(LoaderQ))#net_load.table == Tab ->
1906
 
            %% I'm currently loading the table let him wait
 
1961
        IgotIt, IsLoading  ->
 
1962
            %% I'm currently finishing loading the table let him wait
1907
1963
            opt_start_sender2(R,Pids, [Sender|Kept], LoaderQ);
1908
1964
        IgotIt ->
1909
1965
            %% Start worker but keep him in the queue
1910
 
            Pid = spawn_link(?MODULE, send_and_reply,
1911
 
                             [self(), Sender]),
 
1966
            Pid = spawn_link(?MODULE, send_and_reply,[self(), Sender]),
1912
1967
            opt_start_sender2(R,[{Pid,Sender}|Pids],Kept,LoaderQ);
1913
 
        length(Active) > 0 ->
1914
 
            %% Someone else has loaded the table redirect him with a fail msg.
 
1968
        true ->
 
1969
            verbose("Send table failed ~p not active on this node ~n", [Tab]),
1915
1970
            Sender#send_table.receiver_pid ! {copier_done, node()},
1916
 
            opt_start_sender2(R,Pids, Kept, LoaderQ);
1917
 
        true ->
1918
 
            %% No one else is active and the remote loader has decided 
1919
 
            %% that I should load the table first => Keep him in queue
1920
 
            opt_start_sender2(R,Pids, [Sender|Kept], LoaderQ)
 
1971
            opt_start_sender2(R,Pids, Kept, LoaderQ)
1921
1972
    end.
1922
1973
 
1923
 
opt_start_loader(State) ->
1924
 
    LoaderQueue = State#state.loader_queue,
1925
 
    if
1926
 
        LoaderQueue == [] ->
1927
 
            %% No need
1928
 
            State;
1929
 
 
1930
 
        State#state.loader_pid /= undefined ->
1931
 
            %% Bad luck, an loader is already running
1932
 
            State;
1933
 
        
1934
 
        true ->
 
1974
opt_start_loader(State = #state{loader_queue = LoaderQ}) ->
 
1975
    Current = get_loaders(State),
 
1976
    Max = max_loaders(),
 
1977
    case gb_trees:is_empty(LoaderQ) of
 
1978
        true -> 
 
1979
            State;
 
1980
        _ when length(Current) >= Max -> 
 
1981
            State;
 
1982
        false -> 
1935
1983
            SchemaQueue = State#state.dumper_queue,
1936
 
            case lists:keymember(schema_commit, 1, SchemaQueue) of
 
1984
            case lists:keymember(schema_commit_lock, 1, SchemaQueue) of
1937
1985
                false ->
1938
 
                    {Worker, Rest} = pick_next(LoaderQueue),
1939
 
 
1940
 
                    %% Start worker but keep him in the queue
1941
 
                    Pid = spawn_link(?MODULE, load_and_reply, [self(), Worker]),
1942
 
                    State#state{loader_pid = Pid,
1943
 
                                loader_queue = [Worker | Rest]};
 
1986
                    case pick_next(LoaderQ) of
 
1987
                        {none,Rest} ->
 
1988
                            State#state{loader_queue=Rest};
 
1989
                        {Worker,Rest} ->
 
1990
                            case already_loading(Worker, get_loaders(State)) of
 
1991
                                true ->
 
1992
                                    opt_start_loader(State#state{loader_queue = Rest});
 
1993
                                false ->
 
1994
                                    %% Start worker but keep him in the queue
 
1995
                                    Pid = load_and_reply(self(), Worker),
 
1996
                                    State#state{loader_pid=[{Pid,Worker}|get_loaders(State)],
 
1997
                                                loader_queue = Rest}
 
1998
                            end
 
1999
                    end;
1944
2000
                true ->
1945
2001
                    %% Bad luck, we must wait for the schema commit
1946
2002
                    State
1947
2003
            end
1948
2004
    end.
1949
2005
 
 
2006
already_loading(#net_load{table=Tab},Loaders) ->
 
2007
    already_loading2(Tab,Loaders);
 
2008
already_loading(#disc_load{table=Tab},Loaders) ->
 
2009
    already_loading2(Tab,Loaders).
 
2010
 
 
2011
already_loading2(Tab, [{_,#net_load{table=Tab}}|_]) -> true;
 
2012
already_loading2(Tab, [{_,#disc_load{table=Tab}}|_]) -> true;
 
2013
already_loading2(Tab, [_|Rest]) -> already_loading2(Tab,Rest);     
 
2014
already_loading2(_,[]) -> false.
 
2015
 
1950
2016
start_remote_sender(Node, Tab, Receiver, Storage) ->
1951
2017
    Msg = #send_table{table = Tab,
1952
2018
                      receiver_pid = Receiver,
1971
2037
    unlink(ReplyTo),
1972
2038
    exit(normal).
1973
2039
 
1974
 
 
1975
2040
load_and_reply(ReplyTo, Worker) ->
1976
 
    process_flag(trap_exit, true),
1977
 
    Done = load_table(Worker),
1978
 
    ReplyTo ! Done#loader_done{worker_pid = self()},
1979
 
    unlink(ReplyTo),
1980
 
    exit(normal).
 
2041
    Load = load_table_fun(Worker),
 
2042
    SendAndReply = 
 
2043
        fun() -> 
 
2044
                process_flag(trap_exit, true),
 
2045
                Done = Load(),
 
2046
                ReplyTo ! Done#loader_done{worker_pid = self()},
 
2047
                unlink(ReplyTo),
 
2048
                exit(normal)
 
2049
        end,
 
2050
    spawn_link(SendAndReply).
1981
2051
 
1982
2052
%% Now it is time to load the table
1983
2053
%% but first we must check if it still is neccessary
1984
 
load_table(Load) when record(Load, net_load) ->
 
2054
load_table_fun(Load) when record(Load, net_load) ->
1985
2055
    Tab = Load#net_load.table,
1986
2056
    ReplyTo = Load#net_load.opt_reply_to,
1987
2057
    Reason =  Load#net_load.reason,
2000
2070
    if
2001
2071
        ReadNode == node() ->
2002
2072
            %% Already loaded locally
2003
 
            Done;
 
2073
            fun() -> Done end;
2004
2074
        LocalC == true ->
2005
 
            Res = mnesia_loader:disc_load_table(Tab, load_local_content),
2006
 
            Done#loader_done{reply = Res, needs_announce = true, needs_sync = true};
 
2075
            fun() ->
 
2076
                    Res = mnesia_loader:disc_load_table(Tab, load_local_content),
 
2077
                    Done#loader_done{reply = Res, needs_announce = true, needs_sync = true}
 
2078
            end;
2007
2079
        AccessMode == read_only, Reason /= {dumper,add_table_copy} ->
2008
 
            disc_load_table(Tab, Reason, ReplyTo);
 
2080
            fun() -> disc_load_table(Tab, Reason, ReplyTo) end;
2009
2081
        true ->
2010
 
            %% Either we cannot read the table yet
2011
 
            %% or someone is moving a replica between
2012
 
            %% two nodes
2013
 
            Cs =  Load#net_load.cstruct,
2014
 
            Res = mnesia_loader:net_load_table(Tab, Reason, Active, Cs),
2015
 
            case Res of
2016
 
                {loaded, ok} ->
2017
 
                    Done#loader_done{needs_sync = true,
2018
 
                                     reply = Res};
2019
 
                {not_loaded, _} ->
2020
 
                    Done#loader_done{is_loaded = false,
2021
 
                                     reply = Res}
 
2082
            fun() ->
 
2083
                    %% Either we cannot read the table yet
 
2084
                    %% or someone is moving a replica between
 
2085
                    %% two nodes
 
2086
                    Cs =  Load#net_load.cstruct,
 
2087
                    Res = mnesia_loader:net_load_table(Tab, Reason, Active, Cs),
 
2088
                    case Res of
 
2089
                        {loaded, ok} ->
 
2090
                            Done#loader_done{needs_sync = true,
 
2091
                                             reply = Res};
 
2092
                        {not_loaded, _} ->
 
2093
                            Done#loader_done{is_loaded = false,
 
2094
                                             reply = Res}
 
2095
                    end
2022
2096
            end
2023
2097
    end;
2024
 
 
2025
 
load_table(Load) when record(Load, disc_load) ->
 
2098
load_table_fun(Load) when record(Load, disc_load) ->
2026
2099
    Tab = Load#disc_load.table,
2027
2100
    Reason =  Load#disc_load.reason,
2028
2101
    ReplyTo = Load#disc_load.opt_reply_to,
2037
2110
    if
2038
2111
        Active == [], ReadNode == nowhere ->
2039
2112
            %% Not loaded anywhere, lets load it from disc
2040
 
            disc_load_table(Tab, Reason, ReplyTo);
 
2113
            fun() -> disc_load_table(Tab, Reason, ReplyTo) end;
2041
2114
        ReadNode == nowhere ->
2042
2115
            %% Already loaded on other node, lets get it
2043
2116
            Cs = val({Tab, cstruct}),
2044
 
            case mnesia_loader:net_load_table(Tab, Reason, Active, Cs) of
2045
 
                {loaded, ok} ->
2046
 
                    Done#loader_done{needs_sync = true};
2047
 
                {not_loaded, storage_unknown} ->
2048
 
                    Done#loader_done{is_loaded = false};
2049
 
                {not_loaded, ErrReason} ->
2050
 
                    Done#loader_done{is_loaded = false,
2051
 
                                     reply = {not_loaded,ErrReason}}
 
2117
            fun() -> 
 
2118
                    case mnesia_loader:net_load_table(Tab, Reason, Active, Cs) of
 
2119
                        {loaded, ok} ->
 
2120
                            Done#loader_done{needs_sync = true};
 
2121
                        {not_loaded, storage_unknown} ->
 
2122
                            Done#loader_done{is_loaded = false};
 
2123
                        {not_loaded, ErrReason} ->
 
2124
                            Done#loader_done{is_loaded = false,
 
2125
                                             reply = {not_loaded,ErrReason}}
 
2126
                    end
2052
2127
            end;
2053
2128
        true ->
2054
2129
            %% Already readable, do not worry be happy
2055
 
            Done
 
2130
            fun() -> Done end
2056
2131
    end.
2057
2132
 
2058
2133
disc_load_table(Tab, Reason, ReplyTo) ->
2079
2154
 
2080
2155
filter_active(Tab) ->
2081
2156
    ByForce = val({Tab, load_by_force}),
2082
 
    Active = val({Tab, active_replicas}),
 
2157
    Active  = val({Tab, active_replicas}),
2083
2158
    Masters = mnesia_recover:get_master_nodes(Tab),
2084
2159
    Ns = do_filter_active(ByForce, Active, Masters),
2085
2160
    %% Reorder the so that we load from fastest first