~ubuntu-branches/ubuntu/trusty/erlang/trusty

« back to all changes in this revision

Viewing changes to lib/dialyzer/test/r9c_tests_SUITE_data/src/mnesia/mnesia_controller.erl

  • Committer: Bazaar Package Importer
  • Author(s): Clint Byrum
  • Date: 2011-05-05 15:48:43 UTC
  • mfrom: (3.5.13 sid)
  • Revision ID: james.westby@ubuntu.com-20110505154843-0om6ekzg6m7ugj27
Tags: 1:14.b.2-dfsg-3ubuntu1
* Merge from debian unstable.  Remaining changes:
  - Drop libwxgtk2.8-dev build dependency. Wx isn't in main, and not
    supposed to.
  - Drop erlang-wx binary.
  - Drop erlang-wx dependency from -megaco, -common-test, and -reltool, they
    do not really need wx. Also drop it from -debugger; the GUI needs wx,
    but it apparently has CLI bits as well, and is also needed by -megaco,
    so let's keep the package for now.
  - debian/patches/series: Do what I meant, and enable build-options.patch
    instead.
* Additional changes:
  - Drop erlang-wx from -et
* Dropped Changes:
  - patches/pcre-crash.patch: CVE-2008-2371: outer level option with
    alternatives caused crash. (Applied Upstream)
  - fix for ssl certificate verification in newSSL: 
    ssl_cacertfile_fix.patch (Applied Upstream)
  - debian/patches/series: Enable native.patch again, to get stripped beam
    files and reduce the package size again. (build-options is what
    actually accomplished this)
  - Remove build-options.patch on advice from upstream and because it caused
    odd build failures.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
%% ``The contents of this file are subject to the Erlang Public License,
 
2
%% Version 1.1, (the "License"); you may not use this file except in
 
3
%% compliance with the License. You should have received a copy of the
 
4
%% Erlang Public License along with this software. If not, it can be
 
5
%% retrieved via the world wide web at http://www.erlang.org/.
 
6
%% 
 
7
%% Software distributed under the License is distributed on an "AS IS"
 
8
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
 
9
%% the License for the specific language governing rights and limitations
 
10
%% under the License.
 
11
%% 
 
12
%% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
 
13
%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
 
14
%% AB. All Rights Reserved.''
 
15
%% 
 
16
%%     $Id: mnesia_controller.erl,v 1.3 2010/03/04 13:54:19 maria Exp $
 
17
%%
 
18
%% The mnesia_init process loads tables from local disc or from
 
19
%% another nodes. It also coordinates updates of the info about
 
20
%% where we can read and write tables.
 
21
%%
 
22
%% Tables may need to be loaded initially at startup of the local
 
23
%% node or when other nodes announces that they already have loaded
 
24
%% tables that we also want.
 
25
%%
 
26
%% Initially we set the load request queue to those tables that we
 
27
%% safely can load locally, i.e. tables where we have the last
 
28
%% consistent replica and we have received mnesia_down from all
 
29
%% other nodes holding the table. Then we let the mnesia_init
 
30
%% process enter its normal working state.
 
31
%%
 
32
%% When we need to load a table we append a request to the load
 
33
%% request queue. All other requests are regarded as high priority
 
34
%% and are processed immediately (e.g. update table whereabouts).
 
35
%% We processes the load request queue as a "background" job..
 
36
 
 
37
-module(mnesia_controller).
 
38
 
 
39
-behaviour(gen_server).
 
40
 
 
41
%% Mnesia internal stuff
 
42
-export([
 
43
         start/0,
 
44
         i_have_tab/1,
 
45
         info/0,
 
46
         get_info/1,
 
47
         get_workers/1,
 
48
         force_load_table/1,
 
49
         async_dump_log/1,
 
50
         sync_dump_log/1,
 
51
         connect_nodes/1,
 
52
         wait_for_schema_commit_lock/0,
 
53
         release_schema_commit_lock/0,
 
54
         create_table/1,
 
55
         get_disc_copy/1,
 
56
         get_cstructs/0,
 
57
         sync_and_block_table_whereabouts/4,
 
58
         sync_del_table_copy_whereabouts/2,
 
59
         block_table/1,
 
60
         unblock_table/1,
 
61
         block_controller/0,
 
62
         unblock_controller/0,
 
63
         unannounce_add_table_copy/2,
 
64
         master_nodes_updated/2,
 
65
         mnesia_down/1,
 
66
         add_active_replica/2,
 
67
         add_active_replica/3,
 
68
         add_active_replica/4,
 
69
         change_table_access_mode/1,
 
70
         del_active_replica/2,
 
71
         wait_for_tables/2,
 
72
         get_network_copy/2,
 
73
         merge_schema/0,
 
74
         start_remote_sender/4,
 
75
         schedule_late_disc_load/2
 
76
        ]).
 
77
 
 
78
%% gen_server callbacks
 
79
-export([init/1,
 
80
         handle_call/3,
 
81
         handle_cast/2,
 
82
         handle_info/2,
 
83
         terminate/2,
 
84
         code_change/3]).
 
85
 
 
86
%% Module internal stuff
 
87
-export([call/1,
 
88
         cast/1,
 
89
         dump_and_reply/2,
 
90
         load_and_reply/2,
 
91
         send_and_reply/2,
 
92
         wait_for_tables_init/2
 
93
        ]).
 
94
 
 
95
-import(mnesia_lib, [set/2, add/2]).
 
96
-import(mnesia_lib, [fatal/2, error/2, verbose/2, dbg_out/2]).
 
97
 
 
98
-include("mnesia.hrl").
 
99
 
 
100
-define(SERVER_NAME, ?MODULE).  
 
101
 
 
102
-record(state, {supervisor,
 
103
                schema_is_merged = false,
 
104
                early_msgs = [],
 
105
                loader_pid,
 
106
                loader_queue = [],
 
107
                sender_pid,
 
108
                sender_queue =  [],
 
109
                late_loader_queue = [],
 
110
                dumper_pid,          % Dumper or schema commit pid
 
111
                dumper_queue = [],   % Dumper or schema commit queue
 
112
                dump_log_timer_ref,
 
113
                is_stopping = false
 
114
               }).
 
115
 
 
116
-record(worker_reply, {what,
 
117
                       pid,
 
118
                       result
 
119
                      }).
 
120
 
 
121
-record(schema_commit_lock, {owner}).
 
122
-record(block_controller, {owner}).
 
123
 
 
124
-record(dump_log, {initiated_by,
 
125
                   opt_reply_to
 
126
                  }).
 
127
 
 
128
-record(net_load, {table,
 
129
                   reason,
 
130
                   opt_reply_to,
 
131
                   cstruct = unknown
 
132
                  }).
 
133
 
 
134
-record(send_table, {table,
 
135
                     receiver_pid,
 
136
                     remote_storage
 
137
                    }).
 
138
 
 
139
-record(disc_load, {table, 
 
140
                    reason,
 
141
                    opt_reply_to
 
142
                   }).
 
143
 
 
144
-record(late_load, {table,
 
145
                    reason,
 
146
                    opt_reply_to,
 
147
                    loaders
 
148
                   }).
 
149
 
 
150
-record(loader_done, {worker_pid,
 
151
                      is_loaded,
 
152
                      table_name,
 
153
                      needs_announce,
 
154
                      needs_sync,
 
155
                      needs_reply,
 
156
                      reply_to,
 
157
                      reply}).
 
158
 
 
159
-record(sender_done, {worker_pid,
 
160
                      worker_res,
 
161
                      table_name
 
162
                     }).
 
163
 
 
164
-record(dumper_done, {worker_pid,
 
165
                      worker_res
 
166
                     }).
 
167
 
 
168
val(Var) ->
 
169
    case ?catch_val(Var) of
 
170
        {'EXIT', Reason} -> mnesia_lib:other_val(Var, Reason); 
 
171
        Value -> Value
 
172
    end.
 
173
 
 
174
start() ->
 
175
    gen_server:start_link({local, ?SERVER_NAME}, ?MODULE, [self()],
 
176
                          [{timeout, infinity}
 
177
                           %% ,{debug, [trace]}
 
178
                          ]).
 
179
 
 
180
sync_dump_log(InitBy) ->
 
181
    call({sync_dump_log, InitBy}).
 
182
 
 
183
async_dump_log(InitBy) ->
 
184
    ?SERVER_NAME ! {async_dump_log, InitBy}.
 
185
    
 
186
%% Wait for tables to be active
 
187
%% If needed, we will wait for Mnesia to start
 
188
%% If Mnesia stops, we will wait for Mnesia to restart
 
189
%% We will wait even if the list of tables is empty
 
190
%%
 
191
wait_for_tables(Tabs, Timeout) when list(Tabs), Timeout == infinity ->
 
192
    do_wait_for_tables(Tabs, Timeout);
 
193
wait_for_tables(Tabs, Timeout) when list(Tabs),
 
194
                                    integer(Timeout), Timeout >= 0 ->
 
195
    do_wait_for_tables(Tabs, Timeout);
 
196
wait_for_tables(Tabs, Timeout) ->
 
197
    {error, {badarg, Tabs, Timeout}}.
 
198
 
 
199
do_wait_for_tables(Tabs, 0) ->
 
200
    reply_wait(Tabs);
 
201
do_wait_for_tables(Tabs, Timeout) ->
 
202
    Pid = spawn_link(?MODULE, wait_for_tables_init, [self(), Tabs]),
 
203
    receive
 
204
        {?SERVER_NAME, Pid, Res} ->
 
205
            Res;
 
206
 
 
207
        {'EXIT', Pid, _} ->
 
208
            reply_wait(Tabs)
 
209
 
 
210
    after Timeout ->
 
211
            unlink(Pid),
 
212
            exit(Pid, timeout),
 
213
            reply_wait(Tabs)
 
214
    end.
 
215
        
 
216
reply_wait(Tabs) ->
 
217
    case catch mnesia_lib:active_tables() of
 
218
        {'EXIT', _} ->
 
219
            {error, {node_not_running, node()}};
 
220
        Active when list(Active) ->
 
221
            case Tabs -- Active of
 
222
                [] ->
 
223
                    ok;
 
224
                BadTabs ->
 
225
                    {timeout, BadTabs}
 
226
            end
 
227
    end.
 
228
 
 
229
wait_for_tables_init(From, Tabs) ->
 
230
    process_flag(trap_exit, true),
 
231
    Res = wait_for_init(From, Tabs, whereis(?SERVER_NAME)),
 
232
    From ! {?SERVER_NAME, self(), Res},
 
233
    unlink(From),
 
234
    exit(normal).
 
235
 
 
236
wait_for_init(From, Tabs, Init) ->
 
237
    case catch link(Init) of
 
238
        {'EXIT', _} ->
 
239
            %% Mnesia is not started
 
240
            {error, {node_not_running, node()}};
 
241
        true when pid(Init) ->
 
242
            cast({sync_tabs, Tabs, self()}),
 
243
            rec_tabs(Tabs, Tabs, From, Init)
 
244
    end.
 
245
 
 
246
sync_reply(Waiter, Tab) ->
 
247
    Waiter ! {?SERVER_NAME, {tab_synced, Tab}}.
 
248
 
 
249
rec_tabs([Tab | Tabs], AllTabs, From, Init) ->
 
250
    receive
 
251
        {?SERVER_NAME, {tab_synced, Tab}} ->
 
252
            rec_tabs(Tabs, AllTabs, From, Init);
 
253
 
 
254
        {'EXIT', From, _} ->
 
255
            %% This will trigger an exit signal
 
256
            %% to mnesia_init
 
257
            exit(wait_for_tables_timeout);
 
258
        
 
259
        {'EXIT', Init, _} ->
 
260
            %% Oops, mnesia_init stopped,
 
261
            exit(mnesia_stopped)
 
262
    end;
 
263
rec_tabs([], _, _, Init) ->
 
264
    unlink(Init),
 
265
    ok.
 
266
 
 
267
get_cstructs() ->
 
268
    call(get_cstructs).
 
269
 
 
270
mnesia_down(Node) ->
 
271
    case cast({mnesia_down, Node}) of
 
272
        {error, _} -> mnesia_monitor:mnesia_down(?SERVER_NAME, Node);
 
273
        _Pid ->  ok
 
274
    end.
 
275
wait_for_schema_commit_lock() ->
 
276
    link(whereis(?SERVER_NAME)),
 
277
    unsafe_call(wait_for_schema_commit_lock).
 
278
 
 
279
block_controller() ->
 
280
    call(block_controller).
 
281
 
 
282
unblock_controller() ->
 
283
    cast(unblock_controller).
 
284
 
 
285
release_schema_commit_lock() ->
 
286
    cast({release_schema_commit_lock, self()}),
 
287
    unlink(whereis(?SERVER_NAME)).
 
288
    
 
289
%% Special for preparation of add table copy
 
290
get_network_copy(Tab, Cs) ->
 
291
    Work = #net_load{table = Tab,
 
292
                     reason = {dumper, add_table_copy},
 
293
                     cstruct = Cs
 
294
                    },
 
295
    Res = (catch load_table(Work)),
 
296
    if Res#loader_done.is_loaded == true ->
 
297
            Tab = Res#loader_done.table_name,
 
298
            case Res#loader_done.needs_announce of
 
299
                true ->
 
300
                    i_have_tab(Tab);
 
301
                false ->
 
302
                    ignore
 
303
            end;
 
304
       true -> ignore
 
305
    end,
 
306
 
 
307
    receive %% Flush copier done message
 
308
        {copier_done, _Node} ->
 
309
            ok
 
310
    after 500 ->  %% avoid hanging if something is wrong and we shall fail.
 
311
            ignore
 
312
    end,
 
313
    Res#loader_done.reply.
 
314
 
 
315
%% This functions is invoked from the dumper
 
316
%% 
 
317
%% There are two cases here:
 
318
%% startup ->
 
319
%%   no need for sync, since mnesia_controller not started yet
 
320
%% schema_trans ->
 
321
%%   already synced with mnesia_controller since the dumper
 
322
%%   is syncronously started from mnesia_controller
 
323
 
 
324
create_table(Tab) ->
 
325
    {loaded, ok} = mnesia_loader:disc_load_table(Tab, {dumper,create_table}).
 
326
 
 
327
get_disc_copy(Tab) ->
 
328
    disc_load_table(Tab, {dumper,change_table_copy_type}, undefined).
 
329
 
 
330
%% Returns ok instead of yes
 
331
force_load_table(Tab) when atom(Tab), Tab /= schema ->
 
332
    case ?catch_val({Tab, storage_type}) of
 
333
        ram_copies ->
 
334
            do_force_load_table(Tab);
 
335
        disc_copies ->
 
336
            do_force_load_table(Tab);
 
337
        disc_only_copies ->
 
338
            do_force_load_table(Tab);
 
339
        unknown ->
 
340
            set({Tab, load_by_force}, true),
 
341
            cast({force_load_updated, Tab}),
 
342
            wait_for_tables([Tab], infinity);
 
343
        {'EXIT', _} ->
 
344
            {error, {no_exists, Tab}}
 
345
    end;
 
346
force_load_table(Tab) ->
 
347
    {error, {bad_type, Tab}}.
 
348
    
 
349
do_force_load_table(Tab) ->
 
350
    Loaded = ?catch_val({Tab, load_reason}),
 
351
    case Loaded of
 
352
        unknown -> 
 
353
            set({Tab, load_by_force}, true),
 
354
            mnesia_late_loader:async_late_disc_load(node(), [Tab], forced_by_user),
 
355
            wait_for_tables([Tab], infinity);
 
356
        {'EXIT', _} ->
 
357
            set({Tab, load_by_force}, true),
 
358
            mnesia_late_loader:async_late_disc_load(node(), [Tab], forced_by_user),
 
359
            wait_for_tables([Tab], infinity);
 
360
        _ ->
 
361
            ok
 
362
    end.    
 
363
master_nodes_updated(schema, _Masters) ->
 
364
    ignore;
 
365
master_nodes_updated(Tab, Masters) ->
 
366
    cast({master_nodes_updated, Tab, Masters}).
 
367
 
 
368
schedule_late_disc_load(Tabs, Reason) ->
 
369
    MsgTag = late_disc_load,
 
370
    try_schedule_late_disc_load(Tabs, Reason, MsgTag).
 
371
 
 
372
try_schedule_late_disc_load(Tabs, _Reason, MsgTag)
 
373
  when Tabs == [], MsgTag /= schema_is_merged ->
 
374
    ignore;
 
375
try_schedule_late_disc_load(Tabs, Reason, MsgTag) ->
 
376
    GetIntents =
 
377
        fun() ->
 
378
                Item = mnesia_late_disc_load,
 
379
                Nodes = val({current, db_nodes}),
 
380
                mnesia:lock({global, Item, Nodes}, write),
 
381
                case multicall(Nodes -- [node()], disc_load_intents) of
 
382
                    {Replies, []} ->
 
383
                        call({MsgTag, Tabs, Reason, Replies}),
 
384
                        done;
 
385
                    {_, BadNodes} ->
 
386
                        %% Some nodes did not respond, lets try again
 
387
                        {retry, BadNodes}
 
388
                end
 
389
        end,
 
390
    case mnesia:transaction(GetIntents) of
 
391
        {'atomic', done} ->
 
392
            done;
 
393
        {'atomic', {retry, BadNodes}} ->
 
394
            verbose("Retry late_load_tables because bad nodes: ~p~n",
 
395
                    [BadNodes]),
 
396
            try_schedule_late_disc_load(Tabs, Reason, MsgTag);
 
397
        {aborted, AbortReason} ->
 
398
            fatal("Cannot late_load_tables~p: ~p~n",
 
399
                  [[Tabs, Reason, MsgTag], AbortReason])
 
400
    end.
 
401
 
 
402
connect_nodes(Ns) ->    
 
403
    case mnesia:system_info(is_running) of 
 
404
        no ->
 
405
            {error, {node_not_running, node()}};
 
406
        yes ->             
 
407
            {NewC, OldC} = mnesia_recover:connect_nodes(Ns),
 
408
            Connected = NewC ++OldC,
 
409
            New1 = mnesia_lib:intersect(Ns, Connected),
 
410
            New = New1 -- val({current, db_nodes}),
 
411
            
 
412
            case try_merge_schema(New) of
 
413
                ok -> 
 
414
                    mnesia_lib:add_list(extra_db_nodes, New),
 
415
                    {ok, New};
 
416
                {aborted, {throw, Str}} when list(Str) ->
 
417
                    %%mnesia_recover:disconnect_nodes(New),
 
418
                    {error, {merge_schema_failed, lists:flatten(Str)}};
 
419
                Else ->
 
420
                    %% Unconnect nodes where merge failed!!
 
421
                    %% mnesia_recover:disconnect_nodes(New),
 
422
                    {error, Else}
 
423
            end
 
424
    end.
 
425
 
 
426
%% Merge the local schema with the schema on other nodes.
 
427
%% But first we must let all processes that want to force
 
428
%% load tables wait until the schema merge is done.
 
429
 
 
430
merge_schema() ->
 
431
    AllNodes = mnesia_lib:all_nodes(),
 
432
    case try_merge_schema(AllNodes) of
 
433
        ok -> 
 
434
            schema_is_merged();
 
435
        {aborted, {throw, Str}} when list(Str) ->
 
436
            fatal("Failed to merge schema: ~s~n", [Str]);
 
437
        Else ->
 
438
            fatal("Failed to merge schema: ~p~n", [Else])
 
439
    end.
 
440
 
 
441
try_merge_schema(Nodes) ->
 
442
    case mnesia_schema:merge_schema() of
 
443
        {'atomic', not_merged} ->
 
444
            %% No more nodes that we need to merge the schema with
 
445
            ok;
 
446
        {'atomic', {merged, OldFriends, NewFriends}} ->
 
447
            %% Check if new nodes has been added to the schema
 
448
            Diff = mnesia_lib:all_nodes() -- [node() | Nodes],
 
449
            mnesia_recover:connect_nodes(Diff),
 
450
 
 
451
            %% Tell everybody to adopt orphan tables
 
452
            im_running(OldFriends, NewFriends),
 
453
            im_running(NewFriends, OldFriends),
 
454
            
 
455
            try_merge_schema(Nodes);
 
456
        {'atomic', {"Cannot get cstructs", Node, Reason}} ->
 
457
            dbg_out("Cannot get cstructs, Node ~p ~p~n", [Node, Reason]),
 
458
            timer:sleep(1000), % Avoid a endless loop look alike            
 
459
            try_merge_schema(Nodes);
 
460
        Other ->
 
461
            Other
 
462
    end.
 
463
 
 
464
im_running(OldFriends, NewFriends) ->
 
465
    abcast(OldFriends, {im_running, node(), NewFriends}).
 
466
 
 
467
schema_is_merged() ->
 
468
    MsgTag = schema_is_merged,
 
469
    SafeLoads = initial_safe_loads(),
 
470
    
 
471
    %% At this point we do not know anything about
 
472
    %% which tables that the other nodes already
 
473
    %% has loaded and therefore we let the normal
 
474
    %% processing of the loader_queue take care
 
475
    %% of it, since we at that time point will
 
476
    %% know the whereabouts. We rely on the fact
 
477
    %% that all nodes tells each other directly
 
478
    %% when they have loaded a table and are
 
479
    %% willing to share it.
 
480
    
 
481
    try_schedule_late_disc_load(SafeLoads, initial, MsgTag).
 
482
 
 
483
 
 
484
cast(Msg) ->
 
485
    case whereis(?SERVER_NAME) of
 
486
        undefined ->{error, {node_not_running, node()}};
 
487
        Pid ->  gen_server:cast(Pid, Msg)
 
488
    end.
 
489
 
 
490
abcast(Nodes, Msg) ->
 
491
    gen_server:abcast(Nodes, ?SERVER_NAME, Msg).
 
492
 
 
493
unsafe_call(Msg) ->
 
494
    case whereis(?SERVER_NAME) of
 
495
        undefined -> {error, {node_not_running, node()}};
 
496
        Pid -> gen_server:call(Pid, Msg, infinity)
 
497
    end.
 
498
 
 
499
call(Msg) ->
 
500
    case whereis(?SERVER_NAME) of
 
501
        undefined ->
 
502
            {error, {node_not_running, node()}};
 
503
        Pid ->
 
504
            link(Pid),
 
505
            Res = gen_server:call(Pid, Msg, infinity),
 
506
            unlink(Pid),
 
507
 
 
508
            %% We get an exit signal if server dies
 
509
            receive
 
510
                {'EXIT', Pid, _Reason} ->
 
511
                    {error, {node_not_running, node()}}
 
512
            after 0 ->
 
513
                    ignore
 
514
            end,
 
515
            Res
 
516
    end.
 
517
 
 
518
remote_call(Node, Func, Args) ->
 
519
    case catch gen_server:call({?MODULE, Node}, {Func, Args, self()}, infinity) of
 
520
        {'EXIT', Error} ->
 
521
            {error, Error};
 
522
        Else ->
 
523
            Else
 
524
    end.
 
525
    
 
526
multicall(Nodes, Msg) ->
 
527
    {Good, Bad} = gen_server:multi_call(Nodes, ?MODULE, Msg, infinity),
 
528
    PatchedGood = [Reply || {_Node, Reply} <- Good],
 
529
    {PatchedGood, Bad}.  %% Make the replies look like rpc:multicalls..
 
530
%%    rpc:multicall(Nodes, ?MODULE, call, [Msg]).
 
531
 
 
532
%%%----------------------------------------------------------------------
 
533
%%% Callback functions from gen_server
 
534
%%%----------------------------------------------------------------------
 
535
 
 
536
%%----------------------------------------------------------------------
 
537
%% Func: init/1
 
538
%% Returns: {ok, State}          |
 
539
%%          {ok, State, Timeout} |
 
540
%%          {stop, Reason}
 
541
%%----------------------------------------------------------------------
 
542
init([Parent]) ->
 
543
    process_flag(trap_exit, true),
 
544
    mnesia_lib:verbose("~p starting: ~p~n", [?SERVER_NAME, self()]),
 
545
 
 
546
    %% Handshake and initialize transaction recovery
 
547
    %% for new nodes detected in the schema
 
548
    All = mnesia_lib:all_nodes(),
 
549
    Diff = All -- [node() | val(original_nodes)],
 
550
    mnesia_lib:unset(original_nodes),
 
551
    mnesia_recover:connect_nodes(Diff),
 
552
 
 
553
    Interval = mnesia_monitor:get_env(dump_log_time_threshold),
 
554
    Msg = {async_dump_log, time_threshold},
 
555
    {ok, Ref} = timer:send_interval(Interval, Msg),
 
556
    mnesia_dumper:start_regulator(),
 
557
    
 
558
    {ok, #state{supervisor = Parent, dump_log_timer_ref = Ref}}.
 
559
 
 
560
%%----------------------------------------------------------------------
 
561
%% Func: handle_call/3
 
562
%% Returns: {reply, Reply, State}          |
 
563
%%          {reply, Reply, State, Timeout} |
 
564
%%          {noreply, State}               |
 
565
%%          {noreply, State, Timeout}      |
 
566
%%          {stop, Reason, Reply, State}   | (terminate/2 is called)
 
567
%%          {stop, Reason, Reply, State}     (terminate/2 is called)
 
568
%%----------------------------------------------------------------------
 
569
 
 
570
handle_call({sync_dump_log, InitBy}, From, State) ->
 
571
    Worker = #dump_log{initiated_by = InitBy,
 
572
                       opt_reply_to = From
 
573
                      },
 
574
    State2 = add_worker(Worker, State),
 
575
    noreply(State2);
 
576
 
 
577
handle_call(wait_for_schema_commit_lock, From, State) ->
 
578
    Worker = #schema_commit_lock{owner = From},
 
579
    State2 = add_worker(Worker, State),
 
580
    noreply(State2);
 
581
 
 
582
handle_call(block_controller, From, State) ->
 
583
    Worker = #block_controller{owner = From},
 
584
    State2 = add_worker(Worker, State),
 
585
    noreply(State2);
 
586
 
 
587
 
 
588
handle_call(get_cstructs, From, State) ->
 
589
    Tabs = val({schema, tables}),
 
590
    Cstructs = [val({T, cstruct}) || T <- Tabs],
 
591
    Running = val({current, db_nodes}),
 
592
    reply(From, {cstructs, Cstructs, Running}),
 
593
    noreply(State);
 
594
 
 
595
handle_call({schema_is_merged, TabsR, Reason, RemoteLoaders}, From, State) ->
 
596
    State2 = late_disc_load(TabsR, Reason, RemoteLoaders, From, State),
 
597
 
 
598
    %% Handle early messages
 
599
    Msgs = State2#state.early_msgs,
 
600
    State3 = State2#state{early_msgs = [], schema_is_merged = true},
 
601
    Ns = val({current, db_nodes}),
 
602
    dbg_out("Schema is merged ~w, State ~w~n", [Ns, State3]),    
 
603
%%    dbg_out("handle_early_msgs ~p ~n", [Msgs]), % qqqq
 
604
    handle_early_msgs(lists:reverse(Msgs), State3);
 
605
 
 
606
handle_call(disc_load_intents, From, State) ->
 
607
    Tabs = disc_load_intents(State#state.loader_queue) ++
 
608
           disc_load_intents(State#state.late_loader_queue),
 
609
    ActiveTabs = mnesia_lib:local_active_tables(),
 
610
    reply(From, {ok, node(), mnesia_lib:union(Tabs, ActiveTabs)}),
 
611
    noreply(State);
 
612
 
 
613
handle_call({update_where_to_write, [add, Tab, AddNode], _From}, _Dummy, State) ->
 
614
%%%    dbg_out("update_w2w ~p", [[add, Tab, AddNode]]), %%% qqqq
 
615
    Current = val({current, db_nodes}),
 
616
    Res = 
 
617
        case lists:member(AddNode, Current) and 
 
618
            State#state.schema_is_merged == true of
 
619
            true ->
 
620
                mnesia_lib:add({Tab, where_to_write}, AddNode);
 
621
            false ->
 
622
                ignore
 
623
        end,
 
624
    {reply, Res, State};
 
625
 
 
626
handle_call({add_active_replica, [Tab, ToNode, RemoteS, AccessMode], From},
 
627
            ReplyTo, State) ->
 
628
    KnownNode = lists:member(ToNode, val({current, db_nodes})),
 
629
    Merged = State#state.schema_is_merged,
 
630
    if
 
631
        KnownNode == false ->
 
632
            reply(ReplyTo, ignore),
 
633
            noreply(State);
 
634
        Merged == true ->
 
635
            Res = add_active_replica(Tab, ToNode, RemoteS, AccessMode),
 
636
            reply(ReplyTo, Res),
 
637
            noreply(State);
 
638
        true -> %% Schema is not merged
 
639
            Msg = {add_active_replica, [Tab, ToNode, RemoteS, AccessMode], From},
 
640
            Msgs = State#state.early_msgs,
 
641
            reply(ReplyTo, ignore),   %% Reply ignore and add data after schema merge
 
642
            noreply(State#state{early_msgs = [{call, Msg, undefined} | Msgs]})
 
643
    end;
 
644
 
 
645
handle_call({unannounce_add_table_copy, [Tab, Node], From}, ReplyTo, State) ->    
 
646
    KnownNode = lists:member(node(From), val({current, db_nodes})),
 
647
    Merged = State#state.schema_is_merged,
 
648
    if
 
649
        KnownNode == false ->
 
650
            reply(ReplyTo, ignore),
 
651
            noreply(State);
 
652
        Merged == true ->
 
653
            Res = unannounce_add_table_copy(Tab, Node),
 
654
            reply(ReplyTo, Res),
 
655
            noreply(State);
 
656
        true -> %% Schema is not merged
 
657
            Msg = {unannounce_add_table_copy, [Tab, Node], From},
 
658
            Msgs = State#state.early_msgs,
 
659
            reply(ReplyTo, ignore),   %% Reply ignore and add data after schema merge
 
660
            %% Set ReplyTO to undefined so we don't reply twice
 
661
            noreply(State#state{early_msgs = [{call, Msg, undefined} | Msgs]})
 
662
    end;
 
663
 
 
664
handle_call(Msg, From, State) when State#state.schema_is_merged == false ->
 
665
    %% Buffer early messages
 
666
%%    dbg_out("Buffered early msg ~p ~n", [Msg]),   %% qqqq
 
667
    Msgs = State#state.early_msgs,
 
668
    noreply(State#state{early_msgs = [{call, Msg, From} | Msgs]});
 
669
 
 
670
handle_call({net_load, Tab, Cs}, From, State) ->
 
671
    Worker = #net_load{table = Tab,
 
672
                       opt_reply_to = From,
 
673
                       reason = add_table_copy,
 
674
                       cstruct = Cs
 
675
                      },
 
676
    State2 = add_worker(Worker, State),
 
677
    noreply(State2);
 
678
 
 
679
handle_call({late_disc_load, Tabs, Reason, RemoteLoaders}, From, State) ->
 
680
    State2 = late_disc_load(Tabs, Reason, RemoteLoaders, From, State),
 
681
    noreply(State2);
 
682
 
 
683
handle_call({block_table, [Tab], From}, _Dummy, State) ->
 
684
    case lists:member(node(From), val({current, db_nodes})) of
 
685
        true ->
 
686
            block_table(Tab);
 
687
        false ->
 
688
            ignore
 
689
    end,
 
690
    {reply, ok, State};
 
691
 
 
692
handle_call({check_w2r, _Node, Tab}, _From, State) ->
 
693
    {reply, val({Tab, where_to_read}), State};
 
694
 
 
695
handle_call(Msg, _From, State) ->
 
696
    error("~p got unexpected call: ~p~n", [?SERVER_NAME, Msg]),
 
697
    noreply(State).
 
698
 
 
699
disc_load_intents([H | T]) when record(H, disc_load) ->
 
700
    [H#disc_load.table | disc_load_intents(T)];
 
701
disc_load_intents([H | T]) when record(H, late_load) ->
 
702
    [H#late_load.table | disc_load_intents(T)];
 
703
disc_load_intents( [H | T]) when record(H, net_load) ->
 
704
    disc_load_intents(T);
 
705
disc_load_intents([]) ->
 
706
    [].
 
707
 
 
708
late_disc_load(TabsR, Reason, RemoteLoaders, From, State) ->
 
709
    verbose("Intend to load tables: ~p~n", [TabsR]),
 
710
    ?eval_debug_fun({?MODULE, late_disc_load},
 
711
                    [{tabs, TabsR}, 
 
712
                     {reason, Reason},
 
713
                     {loaders, RemoteLoaders}]),
 
714
 
 
715
    reply(From, queued),
 
716
    %% RemoteLoaders is a list of {ok, Node, Tabs} tuples
 
717
 
 
718
    %% Remove deleted tabs
 
719
    LocalTabs = mnesia_lib:val({schema, local_tables}),
 
720
    Filter = fun({Tab, Reas}, Acc) -> 
 
721
                     case lists:member(Tab, LocalTabs) of
 
722
                         true -> [{Tab, Reas} | Acc];
 
723
                         false -> Acc
 
724
                     end;
 
725
                (Tab, Acc) ->
 
726
                     case lists:member(Tab, LocalTabs) of
 
727
                         true -> [Tab | Acc];
 
728
                         false -> Acc
 
729
                     end
 
730
             end,
 
731
    
 
732
    Tabs = lists:foldl(Filter, [], TabsR),
 
733
    
 
734
    Nodes = val({current, db_nodes}),
 
735
    LateLoaders = late_loaders(Tabs, Reason, RemoteLoaders, Nodes),
 
736
    LateQueue = State#state.late_loader_queue ++ LateLoaders,
 
737
    State#state{late_loader_queue = LateQueue}.
 
738
 
 
739
late_loaders([{Tab, Reason} | Tabs], DefaultReason, RemoteLoaders, Nodes) ->
 
740
    LoadNodes = late_load_filter(RemoteLoaders, Tab, Nodes, []),
 
741
    case LoadNodes of
 
742
        [] ->
 
743
            cast({disc_load, Tab, Reason}); % Ugly cast
 
744
        _ ->
 
745
            ignore
 
746
    end,
 
747
    LateLoad = #late_load{table = Tab, loaders = LoadNodes, reason = Reason},
 
748
    [LateLoad | late_loaders(Tabs, DefaultReason, RemoteLoaders, Nodes)];
 
749
 
 
750
late_loaders([Tab | Tabs], Reason, RemoteLoaders, Nodes) ->
 
751
    Loaders = late_load_filter(RemoteLoaders, Tab, Nodes, []),
 
752
    case Loaders of
 
753
        [] ->
 
754
            cast({disc_load, Tab, Reason});  % Ugly cast
 
755
        _ ->
 
756
            ignore
 
757
    end,
 
758
    LateLoad = #late_load{table = Tab, loaders = Loaders, reason = Reason},
 
759
    [LateLoad | late_loaders(Tabs, Reason, RemoteLoaders, Nodes)];
 
760
late_loaders([], _Reason, _RemoteLoaders, _Nodes) ->
 
761
    [].
 
762
 
 
763
late_load_filter([{error, _} | RemoteLoaders], Tab, Nodes, Acc) ->
 
764
    late_load_filter(RemoteLoaders, Tab, Nodes, Acc);
 
765
late_load_filter([{badrpc, _} | RemoteLoaders], Tab, Nodes, Acc) ->
 
766
    late_load_filter(RemoteLoaders, Tab, Nodes, Acc);
 
767
late_load_filter([RL | RemoteLoaders], Tab, Nodes, Acc) ->
 
768
    {ok, Node, Intents} = RL,
 
769
    Access = val({Tab, access_mode}),
 
770
    LocalC = val({Tab, local_content}),
 
771
    StillActive = lists:member(Node, Nodes),
 
772
    RemoteIntent = lists:member(Tab, Intents),
 
773
    if
 
774
        Access == read_write,
 
775
        LocalC == false,
 
776
        StillActive == true,
 
777
        RemoteIntent == true ->
 
778
            Masters = mnesia_recover:get_master_nodes(Tab),
 
779
            case lists:member(Node, Masters) of
 
780
                true ->
 
781
                    %% The other node is master node for
 
782
                    %% the table, accept his load intent
 
783
                    late_load_filter(RemoteLoaders, Tab, Nodes, [Node | Acc]);
 
784
                false when Masters == [] ->
 
785
                    %% The table has no master nodes
 
786
                    %% accept his load intent
 
787
                    late_load_filter(RemoteLoaders, Tab, Nodes, [Node | Acc]);
 
788
                false ->
 
789
                    %% Some one else is master node for
 
790
                    %% the table, ignore his load intent
 
791
                    late_load_filter(RemoteLoaders, Tab, Nodes, Acc)
 
792
            end;
 
793
        true ->
 
794
            late_load_filter(RemoteLoaders, Tab, Nodes, Acc)
 
795
    end;
 
796
late_load_filter([], _Tab, _Nodes, Acc) ->
 
797
    Acc.
 
798
    
 
799
%%----------------------------------------------------------------------
 
800
%% Func: handle_cast/2
 
801
%% Returns: {noreply, State}          |
 
802
%%          {noreply, State, Timeout} |
 
803
%%          {stop, Reason, State}            (terminate/2 is called)
 
804
%%----------------------------------------------------------------------
 
805
 
 
806
handle_cast({release_schema_commit_lock, _Owner}, State) ->
 
807
    if
 
808
        State#state.is_stopping == true ->
 
809
            {stop, shutdown, State};
 
810
        true -> 
 
811
            case State#state.dumper_queue of
 
812
                [#schema_commit_lock{}|Rest] ->
 
813
                    [_Worker | Rest] = State#state.dumper_queue,
 
814
                    State2 = State#state{dumper_pid = undefined,
 
815
                                         dumper_queue = Rest},
 
816
                    State3 = opt_start_worker(State2),
 
817
                    noreply(State3);
 
818
                _ ->
 
819
                    noreply(State)
 
820
            end
 
821
    end;
 
822
 
 
823
handle_cast(unblock_controller, State) ->
 
824
    if
 
825
        State#state.is_stopping == true ->
 
826
            {stop, shutdown, State};
 
827
        record(hd(State#state.dumper_queue), block_controller) ->
 
828
            [_Worker | Rest] = State#state.dumper_queue,
 
829
            State2 = State#state{dumper_pid = undefined,
 
830
                                 dumper_queue = Rest},
 
831
            State3 = opt_start_worker(State2),      
 
832
            noreply(State3)
 
833
    end;
 
834
 
 
835
handle_cast({mnesia_down, Node}, State) ->
 
836
    maybe_log_mnesia_down(Node),
 
837
    mnesia_lib:del({current, db_nodes}, Node),
 
838
    mnesia_checkpoint:tm_mnesia_down(Node),
 
839
    Alltabs = val({schema, tables}),
 
840
    State2 = reconfigure_tables(Node, State, Alltabs),
 
841
    case State#state.sender_pid of
 
842
        undefined -> ignore;
 
843
        Pid when pid(Pid) -> Pid ! {copier_done, Node}
 
844
    end,
 
845
    case State#state.loader_pid of
 
846
        undefined -> ignore;
 
847
        Pid2 when pid(Pid2) -> Pid2 ! {copier_done, Node}
 
848
    end,
 
849
    NewSenders = 
 
850
        case State#state.sender_queue of
 
851
            [OldSender | RestSenders] ->
 
852
                Remove = fun(ST) ->
 
853
                                 node(ST#send_table.receiver_pid) /= Node
 
854
                         end,
 
855
                NewS = lists:filter(Remove, RestSenders),
 
856
                %% Keep old sender it will be removed by sender_done
 
857
                [OldSender | NewS];
 
858
            [] ->
 
859
                []
 
860
        end,
 
861
    Early = remove_early_messages(State2#state.early_msgs, Node),
 
862
    mnesia_monitor:mnesia_down(?SERVER_NAME, Node),
 
863
    noreply(State2#state{sender_queue = NewSenders, early_msgs = Early});
 
864
 
 
865
handle_cast({im_running, _Node, NewFriends}, State) ->
 
866
    Tabs = mnesia_lib:local_active_tables() -- [schema],
 
867
    Ns = mnesia_lib:intersect(NewFriends, val({current, db_nodes})),
 
868
    abcast(Ns, {adopt_orphans, node(), Tabs}),
 
869
    noreply(State);
 
870
 
 
871
handle_cast(Msg, State) when State#state.schema_is_merged == false ->
 
872
    %% Buffer early messages
 
873
    Msgs = State#state.early_msgs,
 
874
    noreply(State#state{early_msgs = [{cast, Msg} | Msgs]});
 
875
 
 
876
handle_cast({disc_load, Tab, Reason}, State) ->
 
877
    Worker = #disc_load{table = Tab, reason = Reason},
 
878
    State2 = add_worker(Worker, State),
 
879
    noreply(State2);
 
880
 
 
881
handle_cast(Worker, State) when record(Worker, send_table) ->
 
882
    State2 = add_worker(Worker, State),
 
883
    noreply(State2);
 
884
 
 
885
handle_cast({sync_tabs, Tabs, From}, State) ->
 
886
    %% user initiated wait_for_tables
 
887
    handle_sync_tabs(Tabs, From),
 
888
    noreply(State);
 
889
 
 
890
handle_cast({i_have_tab, Tab, Node}, State) ->
 
891
    case lists:member(Node, val({current, db_nodes})) of
 
892
        true -> 
 
893
            State2 = node_has_tabs([Tab], Node, State),
 
894
            noreply(State2);
 
895
        false ->
 
896
            noreply(State)
 
897
    end;
 
898
 
 
899
handle_cast({force_load_updated, Tab}, State) ->
 
900
    case val({Tab, active_replicas}) of
 
901
        [] ->
 
902
            %% No valid replicas
 
903
            noreply(State);
 
904
        [SomeNode | _] ->
 
905
            State2 = node_has_tabs([Tab], SomeNode, State),
 
906
            noreply(State2)
 
907
    end;
 
908
    
 
909
handle_cast({master_nodes_updated, Tab, Masters}, State) ->
 
910
    Active = val({Tab, active_replicas}),
 
911
    Valid = 
 
912
        case val({Tab, load_by_force}) of
 
913
            true ->
 
914
                Active;
 
915
            false ->
 
916
                if
 
917
                    Masters == [] ->
 
918
                        Active;
 
919
                    true ->
 
920
                        mnesia_lib:intersect(Masters, Active)
 
921
                end
 
922
        end,
 
923
    case Valid of
 
924
        [] ->
 
925
            %% No valid replicas
 
926
            noreply(State);
 
927
        [SomeNode | _] ->
 
928
            State2 = node_has_tabs([Tab], SomeNode, State),
 
929
            noreply(State2)
 
930
    end;
 
931
    
 
932
handle_cast({adopt_orphans, Node, Tabs}, State) ->
 
933
 
 
934
    State2 = node_has_tabs(Tabs, Node, State),
 
935
    
 
936
    %% Register the other node as up and running
 
937
    mnesia_recover:log_mnesia_up(Node),
 
938
    verbose("Logging mnesia_up ~w~n", [Node]),
 
939
    mnesia_lib:report_system_event({mnesia_up, Node}),
 
940
    
 
941
    %% Load orphan tables
 
942
    LocalTabs = val({schema, local_tables}) -- [schema],
 
943
    Nodes = val({current, db_nodes}),
 
944
    {LocalOrphans, RemoteMasters} =
 
945
        orphan_tables(LocalTabs, Node, Nodes, [], []),
 
946
    Reason = {adopt_orphan, node()},
 
947
    mnesia_late_loader:async_late_disc_load(node(), LocalOrphans, Reason),
 
948
    
 
949
    Fun =
 
950
        fun(N) ->
 
951
                RemoteOrphans =
 
952
                    [Tab || {Tab, Ns} <- RemoteMasters,
 
953
                            lists:member(N, Ns)],
 
954
                mnesia_late_loader:maybe_async_late_disc_load(N, RemoteOrphans, Reason)
 
955
          end,
 
956
    lists:foreach(Fun, Nodes),
 
957
    
 
958
    Queue = State2#state.loader_queue,
 
959
    State3 = State2#state{loader_queue = Queue},
 
960
    noreply(State3);
 
961
 
 
962
handle_cast(Msg, State) ->
 
963
    error("~p got unexpected cast: ~p~n", [?SERVER_NAME, Msg]),
 
964
    noreply(State).
 
965
 
 
966
handle_sync_tabs([Tab | Tabs], From) ->
 
967
    case val({Tab, where_to_read}) of
 
968
        nowhere ->
 
969
            case get({sync_tab, Tab}) of
 
970
                undefined ->
 
971
                    put({sync_tab, Tab}, [From]);
 
972
                Pids ->
 
973
                    put({sync_tab, Tab}, [From | Pids])
 
974
            end;
 
975
        _ ->
 
976
            sync_reply(From, Tab)
 
977
    end,
 
978
    handle_sync_tabs(Tabs, From);
 
979
handle_sync_tabs([], _From) ->
 
980
    ok.
 
981
 
 
982
%%----------------------------------------------------------------------
 
983
%% Func: handle_info/2
 
984
%% Returns: {noreply, State}          |
 
985
%%          {noreply, State, Timeout} |
 
986
%%          {stop, Reason, State}            (terminate/2 is called)
 
987
%%----------------------------------------------------------------------
 
988
 
 
989
handle_info({async_dump_log, InitBy}, State) ->
 
990
    Worker = #dump_log{initiated_by = InitBy},
 
991
    State2 = add_worker(Worker, State),
 
992
    noreply(State2);
 
993
 
 
994
handle_info(Done, State) when record(Done, dumper_done) ->
 
995
    Pid = Done#dumper_done.worker_pid,
 
996
    Res = Done#dumper_done.worker_res,
 
997
    if
 
998
        State#state.is_stopping == true ->
 
999
            {stop, shutdown, State};
 
1000
        Res == dumped, Pid == State#state.dumper_pid ->
 
1001
            [Worker | Rest] = State#state.dumper_queue,
 
1002
            reply(Worker#dump_log.opt_reply_to, Res),
 
1003
            State2 = State#state{dumper_pid = undefined,
 
1004
                                 dumper_queue = Rest},
 
1005
            State3 = opt_start_worker(State2),
 
1006
            noreply(State3);
 
1007
        true ->
 
1008
            fatal("Dumper failed: ~p~n state: ~p~n", [Res, State]),
 
1009
            {stop, fatal, State}
 
1010
    end;
 
1011
 
 
1012
handle_info(Done, State) when record(Done, loader_done) ->
 
1013
    if
 
1014
        %% Assertion
 
1015
        Done#loader_done.worker_pid == State#state.loader_pid -> ok
 
1016
    end,
 
1017
            
 
1018
    [_Worker | Rest] = LoadQ0 = State#state.loader_queue,
 
1019
    LateQueue0 = State#state.late_loader_queue,
 
1020
    {LoadQ, LateQueue} =
 
1021
        case Done#loader_done.is_loaded of
 
1022
            true ->
 
1023
                Tab = Done#loader_done.table_name,
 
1024
                
 
1025
                %% Optional user sync
 
1026
                case Done#loader_done.needs_sync of
 
1027
                    true -> user_sync_tab(Tab);
 
1028
                    false -> ignore
 
1029
                end,
 
1030
                
 
1031
                %% Optional table announcement
 
1032
                case Done#loader_done.needs_announce of
 
1033
                    true ->
 
1034
                        i_have_tab(Tab),
 
1035
                        case Tab of
 
1036
                            schema ->
 
1037
                                ignore;
 
1038
                            _ ->
 
1039
                                %% Local node needs to perform user_sync_tab/1
 
1040
                                Ns = val({current, db_nodes}),
 
1041
                                abcast(Ns, {i_have_tab, Tab, node()})
 
1042
                        end;
 
1043
                    false ->
 
1044
                        case Tab of
 
1045
                            schema ->
 
1046
                                ignore;
 
1047
                            _ ->
 
1048
                                %% Local node needs to perform user_sync_tab/1
 
1049
                                Ns = val({current, db_nodes}),
 
1050
                                AlreadyKnows = val({Tab, active_replicas}),
 
1051
                                abcast(Ns -- AlreadyKnows, {i_have_tab, Tab, node()})
 
1052
                        end            
 
1053
                end,
 
1054
 
 
1055
                %% Optional client reply
 
1056
                case Done#loader_done.needs_reply of
 
1057
                    true ->
 
1058
                        reply(Done#loader_done.reply_to,
 
1059
                              Done#loader_done.reply);
 
1060
                    false ->
 
1061
                        ignore
 
1062
                end,
 
1063
                {Rest, reply_late_load(Tab, LateQueue0)};
 
1064
            false ->
 
1065
                case Done#loader_done.reply of
 
1066
                    restart -> 
 
1067
                        {LoadQ0, LateQueue0};
 
1068
                    _ ->
 
1069
                        {Rest, LateQueue0}
 
1070
                end
 
1071
        end,
 
1072
 
 
1073
    State2 = State#state{loader_pid = undefined,
 
1074
                         loader_queue = LoadQ,
 
1075
                         late_loader_queue = LateQueue},
 
1076
 
 
1077
    State3 = opt_start_worker(State2),
 
1078
    noreply(State3);
 
1079
 
 
1080
handle_info(Done, State) when record(Done, sender_done) ->
 
1081
    Pid = Done#sender_done.worker_pid,
 
1082
    Res = Done#sender_done.worker_res,
 
1083
    if
 
1084
        Res == ok, Pid == State#state.sender_pid ->
 
1085
            [Worker | Rest] = State#state.sender_queue,
 
1086
            Worker#send_table.receiver_pid ! {copier_done, node()},
 
1087
            State2 = State#state{sender_pid = undefined,
 
1088
                                 sender_queue = Rest},
 
1089
            State3 = opt_start_worker(State2),
 
1090
            noreply(State3);
 
1091
        true ->
 
1092
            %% No need to send any message to the table receiver
 
1093
            %% since it will soon get a mnesia_down anyway
 
1094
            fatal("Sender failed: ~p~n state: ~p~n", [Res, State]),
 
1095
            {stop, fatal, State}
 
1096
    end;
 
1097
 
 
1098
handle_info({'EXIT', Pid, R}, State) when Pid == State#state.supervisor ->
 
1099
    catch set(mnesia_status, stopping),
 
1100
    case State#state.dumper_pid of
 
1101
        undefined ->
 
1102
            dbg_out("~p was ~p~n", [?SERVER_NAME, R]),
 
1103
            {stop, shutdown, State};
 
1104
        _ ->
 
1105
            noreply(State#state{is_stopping = true})
 
1106
    end;
 
1107
 
 
1108
handle_info({'EXIT', Pid, R}, State) when Pid == State#state.dumper_pid ->
 
1109
    case State#state.dumper_queue of
 
1110
        [#schema_commit_lock{}|Workers] ->  %% Schema trans crashed or was killed
 
1111
            State2 = State#state{dumper_queue = Workers, dumper_pid = undefined},
 
1112
            State3 = opt_start_worker(State2),
 
1113
            noreply(State3);
 
1114
        _Other ->
 
1115
            fatal("Dumper or schema commit crashed: ~p~n state: ~p~n", [R, State]),
 
1116
            {stop, fatal, State}
 
1117
    end;
 
1118
 
 
1119
handle_info({'EXIT', Pid, R}, State) when Pid == State#state.loader_pid ->
 
1120
    fatal("Loader crashed: ~p~n state: ~p~n", [R, State]),
 
1121
    {stop, fatal, State};
 
1122
 
 
1123
handle_info({'EXIT', Pid, R}, State) when Pid == State#state.sender_pid ->
 
1124
    %% No need to send any message to the table receiver
 
1125
    %% since it will soon get a mnesia_down anyway
 
1126
    fatal("Sender crashed: ~p~n state: ~p~n", [R, State]),
 
1127
    {stop, fatal, State};
 
1128
 
 
1129
handle_info({From, get_state}, State) ->
 
1130
    From ! {?SERVER_NAME, State},
 
1131
    noreply(State);
 
1132
 
 
1133
%% No real need for buffering
 
1134
handle_info(Msg, State) when State#state.schema_is_merged == false ->
 
1135
    %% Buffer early messages
 
1136
    Msgs = State#state.early_msgs,
 
1137
    noreply(State#state{early_msgs = [{info, Msg} | Msgs]});
 
1138
 
 
1139
handle_info({'EXIT', Pid, wait_for_tables_timeout}, State) ->
 
1140
    sync_tab_timeout(Pid, get()),
 
1141
    noreply(State);
 
1142
 
 
1143
handle_info(Msg, State) ->
 
1144
    error("~p got unexpected info: ~p~n", [?SERVER_NAME, Msg]),
 
1145
    noreply(State).
 
1146
 
 
1147
reply_late_load(Tab, [H | T]) when H#late_load.table == Tab ->
 
1148
    reply(H#late_load.opt_reply_to, ok),
 
1149
    reply_late_load(Tab, T);
 
1150
reply_late_load(Tab, [H | T])  ->
 
1151
    [H | reply_late_load(Tab, T)];
 
1152
reply_late_load(_Tab, []) ->
 
1153
    [].
 
1154
 
 
1155
sync_tab_timeout(Pid, [{{sync_tab, Tab}, Pids} | Tail]) ->
 
1156
    case lists:delete(Pid, Pids) of
 
1157
        [] ->
 
1158
            erase({sync_tab, Tab});
 
1159
        Pids2 ->
 
1160
            put({sync_tab, Tab}, Pids2)
 
1161
    end,
 
1162
    sync_tab_timeout(Pid, Tail);
 
1163
sync_tab_timeout(Pid, [_ | Tail]) ->
 
1164
    sync_tab_timeout(Pid, Tail);
 
1165
sync_tab_timeout(_Pid, []) ->
 
1166
    ok.
 
1167
 
 
1168
%% Pick the load record that has the highest load order
 
1169
%% Returns {BestLoad, RemainingQueue} or {none, []} if queue is empty
 
1170
pick_next(Queue) ->
 
1171
    pick_next(Queue, none, none, []).
 
1172
 
 
1173
pick_next([Head | Tail], Load, Order, Rest) when record(Head, net_load) ->
 
1174
    Tab = Head#net_load.table,
 
1175
    select_best(Head, Tail, val({Tab, load_order}), Load, Order, Rest);
 
1176
pick_next([Head | Tail], Load, Order, Rest) when record(Head, disc_load) ->
 
1177
    Tab = Head#disc_load.table,
 
1178
    select_best(Head, Tail, val({Tab, load_order}), Load, Order, Rest);
 
1179
pick_next([], Load, _Order, Rest) ->
 
1180
    {Load, Rest}.
 
1181
 
 
1182
select_best(Load, Tail, Order, none, none, Rest) ->
 
1183
    pick_next(Tail, Load, Order, Rest);
 
1184
select_best(Load, Tail, Order, OldLoad, OldOrder, Rest) when Order > OldOrder ->
 
1185
    pick_next(Tail, Load, Order, [OldLoad | Rest]);
 
1186
select_best(Load, Tail, _Order, OldLoad, OldOrder, Rest) ->
 
1187
    pick_next(Tail, OldLoad, OldOrder, [Load | Rest]).
 
1188
 
 
1189
%%----------------------------------------------------------------------
 
1190
%% Func: terminate/2
 
1191
%% Purpose: Shutdown the server
 
1192
%% Returns: any (ignored by gen_server)
 
1193
%%----------------------------------------------------------------------
 
1194
terminate(Reason, State) ->
 
1195
    mnesia_monitor:terminate_proc(?SERVER_NAME, Reason, State).
 
1196
 
 
1197
%%----------------------------------------------------------------------
 
1198
%% Func: code_change/3
 
1199
%% Purpose: Upgrade process when its code is to be changed
 
1200
%% Returns: {ok, NewState}
 
1201
%%----------------------------------------------------------------------
 
1202
code_change(_OldVsn, State, _Extra) ->
 
1203
    {ok, State}.
 
1204
 
 
1205
%%%----------------------------------------------------------------------
 
1206
%%% Internal functions
 
1207
%%%----------------------------------------------------------------------
 
1208
 
 
1209
maybe_log_mnesia_down(N) ->
 
1210
    %% We use mnesia_down when deciding which tables to load locally,
 
1211
    %% so if we are not running (i.e haven't decided which tables
 
1212
    %% to load locally), don't log mnesia_down yet.
 
1213
    case mnesia_lib:is_running() of
 
1214
        yes -> 
 
1215
            verbose("Logging mnesia_down ~w~n", [N]),
 
1216
            mnesia_recover:log_mnesia_down(N),
 
1217
            ok;
 
1218
        _ ->                
 
1219
            Filter = fun(Tab) ->
 
1220
                             inactive_copy_holders(Tab, N)
 
1221
                     end,
 
1222
            HalfLoadedTabs = lists:any(Filter, val({schema, local_tables}) -- [schema]),
 
1223
            if 
 
1224
                HalfLoadedTabs == true ->
 
1225
                    verbose("Logging mnesia_down ~w~n", [N]),
 
1226
                    mnesia_recover:log_mnesia_down(N),
 
1227
                    ok;         
 
1228
                true ->
 
1229
                    %% Unfortunately we have not loaded some common
 
1230
                    %% tables yet, so we cannot rely on the nodedown
 
1231
                    log_later   %% BUGBUG handle this case!!!
 
1232
            end
 
1233
    end.
 
1234
 
 
1235
inactive_copy_holders(Tab, Node) ->
 
1236
    Cs = val({Tab, cstruct}),
 
1237
    case mnesia_lib:cs_to_storage_type(Node, Cs) of
 
1238
        unknown ->
 
1239
            false;
 
1240
        _Storage ->
 
1241
            mnesia_lib:not_active_here(Tab)
 
1242
    end.
 
1243
 
 
1244
orphan_tables([Tab | Tabs], Node, Ns, Local, Remote) ->
 
1245
    Cs = val({Tab, cstruct}),
 
1246
    CopyHolders = mnesia_lib:copy_holders(Cs),
 
1247
    RamCopyHolders = Cs#cstruct.ram_copies,
 
1248
    DiscCopyHolders = CopyHolders -- RamCopyHolders,
 
1249
    DiscNodes = val({schema, disc_copies}),
 
1250
    LocalContent = Cs#cstruct.local_content,
 
1251
    RamCopyHoldersOnDiscNodes = mnesia_lib:intersect(RamCopyHolders, DiscNodes),
 
1252
    Active = val({Tab, active_replicas}),
 
1253
    case lists:member(Node, DiscCopyHolders) of
 
1254
        true when Active == [] ->
 
1255
            case DiscCopyHolders -- Ns of
 
1256
                [] ->
 
1257
                    %% We're last up and the other nodes have not
 
1258
                    %% loaded the table. Lets load it if we are
 
1259
                    %% the smallest node.
 
1260
                    case lists:min(DiscCopyHolders) of
 
1261
                        Min when Min == node() ->
 
1262
                            case mnesia_recover:get_master_nodes(Tab) of
 
1263
                                [] ->
 
1264
                                    L = [Tab | Local],
 
1265
                                    orphan_tables(Tabs, Node, Ns, L, Remote);
 
1266
                                Masters ->
 
1267
                                    R = [{Tab, Masters} | Remote],
 
1268
                                    orphan_tables(Tabs, Node, Ns, Local, R)
 
1269
                            end;
 
1270
                        _ ->
 
1271
                            orphan_tables(Tabs, Node, Ns, Local, Remote)
 
1272
                    end;
 
1273
                _ ->
 
1274
                    orphan_tables(Tabs, Node, Ns, Local, Remote)
 
1275
            end;
 
1276
        false when Active == [], DiscCopyHolders == [], RamCopyHoldersOnDiscNodes == [] ->
 
1277
            %% Special case when all replicas resides on disc less nodes
 
1278
            orphan_tables(Tabs, Node, Ns, [Tab | Local], Remote);
 
1279
        _ when LocalContent == true ->
 
1280
            orphan_tables(Tabs, Node, Ns, [Tab | Local], Remote);
 
1281
        _ ->
 
1282
            orphan_tables(Tabs, Node, Ns, Local, Remote)
 
1283
    end;
 
1284
orphan_tables([], _, _, LocalOrphans, RemoteMasters) ->
 
1285
    {LocalOrphans, RemoteMasters}.
 
1286
 
 
1287
node_has_tabs([Tab | Tabs], Node, State) when Node /= node() ->
 
1288
    State2 = update_whereabouts(Tab, Node, State),
 
1289
    node_has_tabs(Tabs, Node, State2);
 
1290
node_has_tabs([Tab | Tabs], Node, State) ->
 
1291
    user_sync_tab(Tab),
 
1292
    node_has_tabs(Tabs, Node, State);
 
1293
node_has_tabs([], _Node, State) ->
 
1294
    State.
 
1295
 
 
1296
update_whereabouts(Tab, Node, State) ->
 
1297
    Storage = val({Tab, storage_type}),
 
1298
    Read = val({Tab, where_to_read}),
 
1299
    LocalC = val({Tab, local_content}),
 
1300
    BeingCreated = (?catch_val({Tab, create_table}) == true),
 
1301
    Masters = mnesia_recover:get_master_nodes(Tab),
 
1302
    ByForce = val({Tab, load_by_force}),
 
1303
    GoGetIt =
 
1304
        if
 
1305
            ByForce == true ->
 
1306
                true;
 
1307
            Masters == [] ->
 
1308
                true;
 
1309
            true ->
 
1310
                lists:member(Node, Masters)
 
1311
        end,
 
1312
    
 
1313
    dbg_out("Table ~w is loaded on ~w. s=~w, r=~w, lc=~w, f=~w, m=~w~n",
 
1314
            [Tab, Node, Storage, Read, LocalC, ByForce, GoGetIt]),
 
1315
    if
 
1316
        LocalC == true ->
 
1317
            %% Local contents, don't care about other node
 
1318
            State;
 
1319
        Storage == unknown, Read == nowhere ->
 
1320
            %% No own copy, time to read remotely
 
1321
            %% if the other node is a good node
 
1322
            add_active_replica(Tab, Node),
 
1323
            case GoGetIt of
 
1324
                true ->
 
1325
                    set({Tab, where_to_read}, Node),
 
1326
                    user_sync_tab(Tab),
 
1327
                    State;
 
1328
                false ->
 
1329
                    State
 
1330
            end;
 
1331
        Storage == unknown ->
 
1332
            %% No own copy, continue to read remotely       
 
1333
            add_active_replica(Tab, Node),          
 
1334
            NodeST = mnesia_lib:storage_type_at_node(Node, Tab),
 
1335
            ReadST = mnesia_lib:storage_type_at_node(Read, Tab),
 
1336
            if   %% Avoid reading from disc_only_copies
 
1337
                NodeST == disc_only_copies ->
 
1338
                    ignore;
 
1339
                ReadST == disc_only_copies ->
 
1340
                    mnesia_lib:set_remote_where_to_read(Tab);
 
1341
                true ->
 
1342
                    ignore
 
1343
            end,
 
1344
            user_sync_tab(Tab),
 
1345
            State;
 
1346
        BeingCreated == true ->
 
1347
            %% The table is currently being created
 
1348
            %% and we shall have an own copy of it.
 
1349
            %% We will load the (empty) table locally.
 
1350
            add_active_replica(Tab, Node),
 
1351
            State;
 
1352
        Read == nowhere ->
 
1353
            %% Own copy, go and get a copy of the table
 
1354
            %% if the other node is master or if there
 
1355
            %% are no master at all
 
1356
            add_active_replica(Tab, Node),
 
1357
            case GoGetIt of
 
1358
                true ->
 
1359
                    Worker = #net_load{table = Tab,
 
1360
                                       reason = {active_remote, Node}},
 
1361
                    add_worker(Worker, State);
 
1362
                false ->
 
1363
                    State
 
1364
            end;
 
1365
        true ->
 
1366
            %% We already have an own copy
 
1367
            add_active_replica(Tab, Node),
 
1368
            user_sync_tab(Tab),
 
1369
            State
 
1370
    end.
 
1371
 
 
1372
initial_safe_loads() ->
 
1373
    case val({schema, storage_type}) of
 
1374
        ram_copies ->
 
1375
            Downs = [],
 
1376
            Tabs = val({schema, local_tables}) -- [schema],
 
1377
            LastC = fun(T) -> last_consistent_replica(T, Downs) end,
 
1378
            lists:zf(LastC, Tabs);
 
1379
        
 
1380
        disc_copies ->
 
1381
            Downs = mnesia_recover:get_mnesia_downs(),
 
1382
            dbg_out("mnesia_downs = ~p~n", [Downs]),
 
1383
                
 
1384
            Tabs = val({schema, local_tables}) -- [schema],
 
1385
            LastC = fun(T) -> last_consistent_replica(T, Downs) end,
 
1386
            lists:zf(LastC, Tabs)
 
1387
    end.
 
1388
    
 
1389
last_consistent_replica(Tab, Downs) ->
 
1390
    Cs = val({Tab, cstruct}),
 
1391
    Storage = mnesia_lib:cs_to_storage_type(node(), Cs),
 
1392
    Ram = Cs#cstruct.ram_copies,
 
1393
    Disc = Cs#cstruct.disc_copies,
 
1394
    DiscOnly = Cs#cstruct.disc_only_copies,
 
1395
    BetterCopies0 = mnesia_lib:remote_copy_holders(Cs) -- Downs,
 
1396
    BetterCopies = BetterCopies0 -- Ram,
 
1397
    AccessMode = Cs#cstruct.access_mode,
 
1398
    Copies = mnesia_lib:copy_holders(Cs),
 
1399
    Masters = mnesia_recover:get_master_nodes(Tab),
 
1400
    LocalMaster0 = lists:member(node(), Masters),
 
1401
    LocalContent = Cs#cstruct.local_content,
 
1402
    RemoteMaster =
 
1403
        if
 
1404
            Masters == [] -> false;
 
1405
            true -> not LocalMaster0
 
1406
        end,
 
1407
    LocalMaster =
 
1408
        if
 
1409
            Masters == [] -> false;
 
1410
            true -> LocalMaster0
 
1411
        end,
 
1412
    if
 
1413
        Copies == [node()]  ->
 
1414
            %% Only one copy holder and it is local.
 
1415
            %% It may also be a local contents table
 
1416
            {true, {Tab, local_only}};
 
1417
        LocalContent == true ->
 
1418
            {true, {Tab, local_content}};
 
1419
        LocalMaster == true ->
 
1420
            %% We have a local master
 
1421
            {true, {Tab, local_master}};
 
1422
        RemoteMaster == true ->
 
1423
            %% Wait for remote master copy
 
1424
            false;
 
1425
        Storage == ram_copies ->
 
1426
            if
 
1427
                Disc == [], DiscOnly == [] ->
 
1428
                    %% Nobody has copy on disc
 
1429
                    {true, {Tab, ram_only}};
 
1430
                true ->
 
1431
                    %% Some other node has copy on disc
 
1432
                    false
 
1433
            end;
 
1434
        AccessMode == read_only ->
 
1435
            %% No one has been able to update the table,
 
1436
            %% i.e. all disc resident copies are equal
 
1437
            {true, {Tab, read_only}};
 
1438
        BetterCopies /= [], Masters /= [node()] ->
 
1439
            %% There are better copies on other nodes
 
1440
            %% and we do not have the only master copy
 
1441
            false;
 
1442
        true ->
 
1443
            {true, {Tab, initial}}
 
1444
    end.
 
1445
 
 
1446
reconfigure_tables(N, State, [Tab |Tail]) ->
 
1447
    del_active_replica(Tab, N),
 
1448
    case val({Tab, where_to_read}) of
 
1449
        N ->  mnesia_lib:set_remote_where_to_read(Tab);
 
1450
        _ ->  ignore
 
1451
    end,
 
1452
    LateQ = drop_loaders(Tab, N, State#state.late_loader_queue),
 
1453
    reconfigure_tables(N, State#state{late_loader_queue = LateQ}, Tail);
 
1454
 
 
1455
reconfigure_tables(_, State, []) ->
 
1456
    State.
 
1457
 
 
1458
remove_early_messages([], _Node) ->
 
1459
    [];
 
1460
remove_early_messages([{call, {add_active_replica, [_, Node, _, _], _}, _}|R], Node) ->
 
1461
    remove_early_messages(R, Node); %% Does a reply before queuing
 
1462
remove_early_messages([{call, {block_table, _, From}, ReplyTo}|R], Node) 
 
1463
  when node(From) == Node ->
 
1464
    reply(ReplyTo, ok),  %% Remove gen:server waits..
 
1465
    remove_early_messages(R, Node);
 
1466
remove_early_messages([{cast, {i_have_tab, _Tab, Node}}|R], Node) ->
 
1467
    remove_early_messages(R, Node);
 
1468
remove_early_messages([{cast, {adopt_orphans, Node, _Tabs}}|R], Node) ->
 
1469
    remove_early_messages(R, Node);
 
1470
remove_early_messages([M|R],Node) ->
 
1471
    [M|remove_early_messages(R,Node)].
 
1472
 
 
1473
%% Drop loader from late load queue and possibly trigger a disc_load
 
1474
drop_loaders(Tab, Node, [H | T]) when H#late_load.table == Tab ->
 
1475
    %% Check if it is time to issue a disc_load request
 
1476
    case H#late_load.loaders of
 
1477
        [Node] ->
 
1478
            Reason = {H#late_load.reason, last_loader_down, Node},
 
1479
            cast({disc_load, Tab, Reason});  % Ugly cast
 
1480
        _ ->
 
1481
            ignore
 
1482
    end,
 
1483
    %% Drop the node from the list of loaders
 
1484
    H2 = H#late_load{loaders = H#late_load.loaders -- [Node]},
 
1485
    [H2 | drop_loaders(Tab, Node, T)];
 
1486
drop_loaders(Tab, Node, [H | T]) ->
 
1487
    [H | drop_loaders(Tab, Node, T)];
 
1488
drop_loaders(_, _, []) ->
 
1489
    [].
 
1490
 
 
1491
add_active_replica(Tab, Node) ->
 
1492
    add_active_replica(Tab, Node, val({Tab, cstruct})).
 
1493
 
 
1494
add_active_replica(Tab, Node, Cs) when record(Cs, cstruct) ->
 
1495
    Storage = mnesia_lib:schema_cs_to_storage_type(Node, Cs),
 
1496
    AccessMode = Cs#cstruct.access_mode,
 
1497
    add_active_replica(Tab, Node, Storage, AccessMode).
 
1498
 
 
1499
%% Block table primitives
 
1500
 
 
1501
block_table(Tab) ->
 
1502
    Var = {Tab, where_to_commit},
 
1503
    Old = val(Var),
 
1504
    New = {blocked, Old},
 
1505
    set(Var, New). % where_to_commit
 
1506
 
 
1507
unblock_table(Tab) ->
 
1508
    Var = {Tab, where_to_commit},
 
1509
    New = 
 
1510
        case val(Var) of
 
1511
            {blocked, List} ->
 
1512
                List;
 
1513
            List ->
 
1514
                List
 
1515
        end,
 
1516
    set(Var, New). % where_to_commit
 
1517
 
 
1518
is_tab_blocked(W2C) when list(W2C) ->
 
1519
    {false, W2C};
 
1520
is_tab_blocked({blocked, W2C}) when list(W2C) ->
 
1521
    {true, W2C}.
 
1522
 
 
1523
mark_blocked_tab(true, Value) -> 
 
1524
    {blocked, Value};
 
1525
mark_blocked_tab(false, Value) -> 
 
1526
    Value.
 
1527
 
 
1528
%%
 
1529
 
 
1530
add_active_replica(Tab, Node, Storage, AccessMode) ->
 
1531
    Var = {Tab, where_to_commit},
 
1532
    {Blocked, Old} = is_tab_blocked(val(Var)),
 
1533
    Del = lists:keydelete(Node, 1, Old),
 
1534
    case AccessMode of
 
1535
        read_write ->
 
1536
            New = lists:sort([{Node, Storage} | Del]),
 
1537
            set(Var, mark_blocked_tab(Blocked, New)), % where_to_commit
 
1538
            add({Tab, where_to_write}, Node);
 
1539
        read_only ->
 
1540
            set(Var, mark_blocked_tab(Blocked, Del)),
 
1541
            mnesia_lib:del({Tab, where_to_write}, Node)
 
1542
    end,
 
1543
    add({Tab, active_replicas}, Node).
 
1544
 
 
1545
del_active_replica(Tab, Node) ->
 
1546
    Var = {Tab, where_to_commit},
 
1547
    {Blocked, Old} = is_tab_blocked(val(Var)),
 
1548
    Del = lists:keydelete(Node, 1, Old),
 
1549
    New = lists:sort(Del),
 
1550
    set(Var, mark_blocked_tab(Blocked, New)),      % where_to_commit
 
1551
    mnesia_lib:del({Tab, active_replicas}, Node),
 
1552
    mnesia_lib:del({Tab, where_to_write}, Node).
 
1553
 
 
1554
change_table_access_mode(Cs) ->
 
1555
    Tab = Cs#cstruct.name,
 
1556
    lists:foreach(fun(N) -> add_active_replica(Tab, N, Cs) end,
 
1557
                  val({Tab, active_replicas})).
 
1558
 
 
1559
%% node To now has tab loaded, but this must be undone
 
1560
%% This code is rpc:call'ed from the tab_copier process
 
1561
%% when it has *not* released it's table lock
 
1562
unannounce_add_table_copy(Tab, To) ->
 
1563
    del_active_replica(Tab, To),
 
1564
    case val({Tab , where_to_read}) of
 
1565
        To -> 
 
1566
            mnesia_lib:set_remote_where_to_read(Tab);
 
1567
        _ ->
 
1568
            ignore
 
1569
    end.
 
1570
 
 
1571
user_sync_tab(Tab) ->
 
1572
    case val(debug) of
 
1573
        trace ->
 
1574
            mnesia_subscr:subscribe(whereis(mnesia_event), {table, Tab});
 
1575
        _ ->
 
1576
            ignore
 
1577
    end,
 
1578
        
 
1579
    case erase({sync_tab, Tab}) of
 
1580
        undefined ->
 
1581
            ok;
 
1582
        Pids ->
 
1583
            lists:foreach(fun(Pid) -> sync_reply(Pid, Tab) end, Pids)
 
1584
    end.
 
1585
 
 
1586
i_have_tab(Tab) ->
 
1587
    case val({Tab, local_content}) of
 
1588
        true ->
 
1589
            mnesia_lib:set_local_content_whereabouts(Tab);
 
1590
        false ->
 
1591
            set({Tab, where_to_read}, node())
 
1592
    end,
 
1593
    add_active_replica(Tab, node()).
 
1594
 
 
1595
sync_and_block_table_whereabouts(Tab, ToNode, RemoteS, AccessMode) when Tab /= schema ->
 
1596
    Current = val({current, db_nodes}),
 
1597
    Ns = 
 
1598
        case lists:member(ToNode, Current) of
 
1599
            true -> Current -- [ToNode];
 
1600
            false -> Current
 
1601
        end,   
 
1602
    remote_call(ToNode, block_table, [Tab]),
 
1603
    [remote_call(Node, add_active_replica, [Tab, ToNode, RemoteS, AccessMode]) ||
 
1604
        Node <- [ToNode | Ns]],
 
1605
    ok.
 
1606
 
 
1607
sync_del_table_copy_whereabouts(Tab, ToNode) when Tab /= schema ->
 
1608
    Current = val({current, db_nodes}),
 
1609
    Ns =
 
1610
        case lists:member(ToNode, Current) of
 
1611
            true -> Current;
 
1612
            false -> [ToNode | Current]
 
1613
        end,
 
1614
    Args = [Tab, ToNode],
 
1615
    [remote_call(Node, unannounce_add_table_copy, Args) || Node <- Ns],
 
1616
    ok.
 
1617
 
 
1618
get_info(Timeout) ->
 
1619
    case whereis(?SERVER_NAME) of
 
1620
        undefined ->
 
1621
            {timeout, Timeout};
 
1622
        Pid ->
 
1623
            Pid ! {self(), get_state},
 
1624
            receive
 
1625
                {?SERVER_NAME, State} when record(State, state) ->
 
1626
                    {info,State}
 
1627
            after Timeout ->
 
1628
                    {timeout, Timeout}
 
1629
            end
 
1630
    end.
 
1631
 
 
1632
get_workers(Timeout) ->
 
1633
    case whereis(?SERVER_NAME) of
 
1634
        undefined ->
 
1635
            {timeout, Timeout};
 
1636
        Pid ->
 
1637
            Pid ! {self(), get_state},
 
1638
            receive
 
1639
                {?SERVER_NAME, State} when record(State, state) ->
 
1640
                    {workers, State#state.loader_pid, State#state.sender_pid, State#state.dumper_pid}
 
1641
            after Timeout ->
 
1642
                    {timeout, Timeout}
 
1643
            end
 
1644
    end.
 
1645
    
 
1646
info() ->
 
1647
    Tabs = mnesia_lib:local_active_tables(),
 
1648
    io:format( "---> Active tables <--- ~n", []),
 
1649
    info(Tabs).
 
1650
 
 
1651
info([Tab | Tail]) ->
 
1652
    case val({Tab, storage_type}) of
 
1653
        disc_only_copies ->
 
1654
            info_format(Tab, 
 
1655
                        dets:info(Tab, size), 
 
1656
                        dets:info(Tab, file_size),
 
1657
                        "bytes on disc");
 
1658
        _ ->
 
1659
            info_format(Tab, 
 
1660
                        ?ets_info(Tab, size),
 
1661
                        ?ets_info(Tab, memory),
 
1662
                        "words of mem")
 
1663
    end,
 
1664
    info(Tail);
 
1665
info([]) -> ok;
 
1666
info(Tab) -> info([Tab]).
 
1667
 
 
1668
info_format(Tab, Size, Mem, Media) ->
 
1669
    StrT = mnesia_lib:pad_name(atom_to_list(Tab), 15, []),
 
1670
    StrS = mnesia_lib:pad_name(integer_to_list(Size), 8, []),
 
1671
    StrM = mnesia_lib:pad_name(integer_to_list(Mem), 8, []),
 
1672
    io:format("~s: with ~s records occupying ~s ~s~n",
 
1673
              [StrT, StrS, StrM, Media]).
 
1674
 
 
1675
%% Handle early arrived messages
 
1676
handle_early_msgs([Msg | Msgs], State) ->
 
1677
    %% The messages are in reverse order
 
1678
    case handle_early_msg(Msg, State) of
 
1679
        {stop, Reason, Reply, State2} ->
 
1680
            {stop, Reason, Reply, State2};
 
1681
        {stop, Reason, State2} ->
 
1682
            {stop, Reason, State2};
 
1683
        {noreply, State2} ->
 
1684
            handle_early_msgs(Msgs, State2);
 
1685
        {noreply, State2, _Timeout} ->
 
1686
            handle_early_msgs(Msgs, State2);
 
1687
        Else ->  
 
1688
            dbg_out("handle_early_msgs case clause ~p ~n", [Else]),
 
1689
            erlang:error(Else, [[Msg | Msgs], State])
 
1690
    end;
 
1691
handle_early_msgs([], State) ->
 
1692
    noreply(State).
 
1693
 
 
1694
handle_early_msg({call, Msg, From}, State) ->
 
1695
    handle_call(Msg, From, State);
 
1696
handle_early_msg({cast, Msg}, State) ->
 
1697
    handle_cast(Msg, State);
 
1698
handle_early_msg({info, Msg}, State) ->
 
1699
    handle_info(Msg, State).
 
1700
    
 
1701
noreply(State) ->
 
1702
    {noreply, State}.
 
1703
 
 
1704
reply(undefined, Reply) ->
 
1705
    Reply;
 
1706
reply(ReplyTo, Reply) ->
 
1707
    gen_server:reply(ReplyTo, Reply),
 
1708
    Reply.
 
1709
 
 
1710
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
 
1711
%% Worker management
 
1712
 
 
1713
%% Returns new State
 
1714
add_worker(Worker, State) when record(Worker, dump_log) ->
 
1715
    InitBy = Worker#dump_log.initiated_by,
 
1716
    Queue = State#state.dumper_queue,
 
1717
    case lists:keymember(InitBy, #dump_log.initiated_by, Queue) of
 
1718
        false ->
 
1719
            ignore;
 
1720
        true when Worker#dump_log.opt_reply_to == undefined ->
 
1721
            %% The same threshold has been exceeded again,
 
1722
            %% before we have had the possibility to
 
1723
            %% process the older one.
 
1724
            DetectedBy = {dump_log, InitBy},
 
1725
            Event = {mnesia_overload, DetectedBy},
 
1726
            mnesia_lib:report_system_event(Event)
 
1727
    end,
 
1728
    Queue2 = Queue ++ [Worker],
 
1729
    State2 = State#state{dumper_queue = Queue2},
 
1730
    opt_start_worker(State2);
 
1731
add_worker(Worker, State) when record(Worker, schema_commit_lock) ->
 
1732
    Queue = State#state.dumper_queue,
 
1733
    Queue2 = Queue ++ [Worker],
 
1734
    State2 = State#state{dumper_queue = Queue2},
 
1735
    opt_start_worker(State2);
 
1736
add_worker(Worker, State) when record(Worker, net_load) ->
 
1737
    Queue = State#state.loader_queue,
 
1738
    State2 = State#state{loader_queue = Queue ++ [Worker]},
 
1739
    opt_start_worker(State2);
 
1740
add_worker(Worker, State) when record(Worker, send_table) ->
 
1741
    Queue = State#state.sender_queue,
 
1742
    State2 = State#state{sender_queue = Queue ++ [Worker]},
 
1743
    opt_start_worker(State2);
 
1744
add_worker(Worker, State) when record(Worker, disc_load) ->
 
1745
    Queue = State#state.loader_queue,
 
1746
    State2 = State#state{loader_queue = Queue ++ [Worker]},
 
1747
    opt_start_worker(State2);
 
1748
% Block controller should be used for upgrading mnesia.
 
1749
add_worker(Worker, State) when record(Worker, block_controller) -> 
 
1750
    Queue = State#state.dumper_queue,
 
1751
    Queue2 = [Worker | Queue],
 
1752
    State2 = State#state{dumper_queue = Queue2},
 
1753
    opt_start_worker(State2).
 
1754
 
 
1755
%% Optionally start a worker
 
1756
%% 
 
1757
%% Dumpers and loaders may run simultaneously
 
1758
%% but neither of them may run during schema commit.
 
1759
%% Loaders may not start if a schema commit is enqueued.
 
1760
opt_start_worker(State) when State#state.is_stopping == true ->
 
1761
    State;
 
1762
opt_start_worker(State) ->
 
1763
    %% Prioritize dumper and schema commit
 
1764
    %% by checking them first
 
1765
    case State#state.dumper_queue of
 
1766
        [Worker | _Rest] when State#state.dumper_pid == undefined ->
 
1767
            %% Great, a worker in queue and neither
 
1768
            %% a schema transaction is being
 
1769
            %% committed and nor a dumper is running
 
1770
                    
 
1771
            %% Start worker but keep him in the queue
 
1772
            if
 
1773
                record(Worker, schema_commit_lock) ->
 
1774
                    ReplyTo = Worker#schema_commit_lock.owner,
 
1775
                    reply(ReplyTo, granted),
 
1776
                    {Owner, _Tag} = ReplyTo,
 
1777
                    State#state{dumper_pid = Owner};
 
1778
                
 
1779
                record(Worker, dump_log) ->
 
1780
                    Pid = spawn_link(?MODULE, dump_and_reply, [self(), Worker]),
 
1781
                    State2 = State#state{dumper_pid = Pid},
 
1782
 
 
1783
                    %% If the worker was a dumper we may
 
1784
                    %% possibly be able to start a loader
 
1785
                    %% or sender
 
1786
                    State3 = opt_start_sender(State2),
 
1787
                    opt_start_loader(State3);
 
1788
                
 
1789
                record(Worker, block_controller) ->
 
1790
                    case {State#state.sender_pid, State#state.loader_pid} of
 
1791
                        {undefined, undefined} ->
 
1792
                            ReplyTo = Worker#block_controller.owner,
 
1793
                            reply(ReplyTo, granted),
 
1794
                            {Owner, _Tag} = ReplyTo,
 
1795
                            State#state{dumper_pid = Owner};
 
1796
                        _ ->
 
1797
                            State
 
1798
                    end
 
1799
            end;
 
1800
        _ ->
 
1801
            %% Bad luck, try with a loader or sender instead 
 
1802
            State2 = opt_start_sender(State),
 
1803
            opt_start_loader(State2)
 
1804
    end.
 
1805
 
 
1806
opt_start_sender(State) ->
 
1807
    case State#state.sender_queue of
 
1808
        []->
 
1809
            %% No need
 
1810
            State;
 
1811
 
 
1812
        _ when State#state.sender_pid /= undefined ->
 
1813
            %% Bad luck, a sender is already running
 
1814
            State;
 
1815
        
 
1816
        [Sender | _SenderRest] ->
 
1817
            case State#state.loader_queue of
 
1818
                [Loader | _LoaderRest]
 
1819
                when State#state.loader_pid /= undefined,
 
1820
                     Loader#net_load.table == Sender#send_table.table ->
 
1821
                    %% A conflicting loader is running
 
1822
                    State;
 
1823
                _ ->
 
1824
                    SchemaQueue = State#state.dumper_queue,
 
1825
                    case lists:keymember(schema_commit, 1, SchemaQueue) of
 
1826
                        false ->
 
1827
 
 
1828
                            %% Start worker but keep him in the queue
 
1829
                            Pid = spawn_link(?MODULE, send_and_reply,
 
1830
                                             [self(), Sender]),
 
1831
                            State#state{sender_pid = Pid};
 
1832
                        true ->
 
1833
                            %% Bad luck, we must wait for the schema commit
 
1834
                            State
 
1835
                    end
 
1836
            end
 
1837
    end.
 
1838
 
 
1839
opt_start_loader(State) ->
 
1840
    LoaderQueue = State#state.loader_queue,
 
1841
    if
 
1842
        LoaderQueue == [] ->
 
1843
            %% No need
 
1844
            State;
 
1845
 
 
1846
        State#state.loader_pid /= undefined ->
 
1847
            %% Bad luck, an loader is already running
 
1848
            State;
 
1849
        
 
1850
        true ->
 
1851
            SchemaQueue = State#state.dumper_queue,
 
1852
            case lists:keymember(schema_commit, 1, SchemaQueue) of
 
1853
                false ->
 
1854
                    {Worker, Rest} = pick_next(LoaderQueue),
 
1855
 
 
1856
                    %% Start worker but keep him in the queue
 
1857
                    Pid = spawn_link(?MODULE, load_and_reply, [self(), Worker]),
 
1858
                    State#state{loader_pid = Pid,
 
1859
                                loader_queue = [Worker | Rest]};
 
1860
                true ->
 
1861
                    %% Bad luck, we must wait for the schema commit
 
1862
                    State
 
1863
            end
 
1864
    end.
 
1865
 
 
1866
start_remote_sender(Node, Tab, Receiver, Storage) ->
 
1867
    Msg = #send_table{table = Tab,
 
1868
                      receiver_pid = Receiver,
 
1869
                      remote_storage = Storage},
 
1870
    gen_server:cast({?SERVER_NAME, Node}, Msg).
 
1871
 
 
1872
dump_and_reply(ReplyTo, Worker) ->
 
1873
    %% No trap_exit, die intentionally instead
 
1874
    Res = mnesia_dumper:opt_dump_log(Worker#dump_log.initiated_by),
 
1875
    ReplyTo ! #dumper_done{worker_pid = self(),
 
1876
                           worker_res = Res},
 
1877
    unlink(ReplyTo),
 
1878
    exit(normal).
 
1879
 
 
1880
send_and_reply(ReplyTo, Worker) ->
 
1881
    %% No trap_exit, die intentionally instead
 
1882
    Res = mnesia_loader:send_table(Worker#send_table.receiver_pid,
 
1883
                                   Worker#send_table.table,
 
1884
                                   Worker#send_table.remote_storage),
 
1885
    ReplyTo ! #sender_done{worker_pid = self(),
 
1886
                           worker_res = Res},
 
1887
    unlink(ReplyTo),
 
1888
    exit(normal).
 
1889
 
 
1890
 
 
1891
load_and_reply(ReplyTo, Worker) ->
 
1892
    process_flag(trap_exit, true),
 
1893
    Done = load_table(Worker),
 
1894
    ReplyTo ! Done#loader_done{worker_pid = self()},
 
1895
    unlink(ReplyTo),
 
1896
    exit(normal).
 
1897
 
 
1898
%% Now it is time to load the table
 
1899
%% but first we must check if it still is neccessary
 
1900
load_table(Load) when record(Load, net_load) ->
 
1901
    Tab = Load#net_load.table,
 
1902
    ReplyTo = Load#net_load.opt_reply_to,
 
1903
    Reason =  Load#net_load.reason,
 
1904
    LocalC = val({Tab, local_content}),
 
1905
    AccessMode = val({Tab, access_mode}),
 
1906
    ReadNode = val({Tab, where_to_read}),
 
1907
    Active = filter_active(Tab),
 
1908
    Done = #loader_done{is_loaded = true,
 
1909
                        table_name = Tab,
 
1910
                        needs_announce = false,
 
1911
                        needs_sync = false,
 
1912
                        needs_reply = true,
 
1913
                        reply_to = ReplyTo,
 
1914
                        reply = {loaded, ok}
 
1915
                       },
 
1916
    if
 
1917
        ReadNode == node() ->
 
1918
            %% Already loaded locally
 
1919
            Done;
 
1920
        LocalC == true ->
 
1921
            Res = mnesia_loader:disc_load_table(Tab, load_local_content),
 
1922
            Done#loader_done{reply = Res, needs_announce = true, needs_sync = true};
 
1923
        AccessMode == read_only ->
 
1924
            disc_load_table(Tab, Reason, ReplyTo);
 
1925
        true ->
 
1926
            %% Either we cannot read the table yet
 
1927
            %% or someone is moving a replica between
 
1928
            %% two nodes
 
1929
            Cs =  Load#net_load.cstruct,
 
1930
            Res = mnesia_loader:net_load_table(Tab, Reason, Active, Cs),
 
1931
            case Res of
 
1932
                {loaded, ok} ->
 
1933
                    Done#loader_done{needs_sync = true,
 
1934
                                     reply = Res};
 
1935
                {not_loaded, storage_unknown} ->
 
1936
                    Done#loader_done{reply = Res};
 
1937
                {not_loaded, _} ->
 
1938
                    Done#loader_done{is_loaded = false,
 
1939
                                     needs_reply = false,
 
1940
                                     reply = Res}
 
1941
            end
 
1942
    end;
 
1943
 
 
1944
load_table(Load) when record(Load, disc_load) ->
 
1945
    Tab = Load#disc_load.table,
 
1946
    Reason =  Load#disc_load.reason,
 
1947
    ReplyTo = Load#disc_load.opt_reply_to,
 
1948
    ReadNode = val({Tab, where_to_read}),
 
1949
    Active = filter_active(Tab),
 
1950
    Done = #loader_done{is_loaded = true,
 
1951
                        table_name = Tab,
 
1952
                        needs_announce = false,
 
1953
                        needs_sync = false,
 
1954
                        needs_reply = false
 
1955
                       },
 
1956
    if
 
1957
        Active == [], ReadNode == nowhere ->
 
1958
            %% Not loaded anywhere, lets load it from disc
 
1959
            disc_load_table(Tab, Reason, ReplyTo);
 
1960
        ReadNode == nowhere ->
 
1961
            %% Already loaded on other node, lets get it
 
1962
            Cs = val({Tab, cstruct}),
 
1963
            case mnesia_loader:net_load_table(Tab, Reason, Active, Cs) of
 
1964
                {loaded, ok} ->
 
1965
                    Done#loader_done{needs_sync = true};
 
1966
                {not_loaded, storage_unknown} ->
 
1967
                    Done#loader_done{is_loaded = false};
 
1968
                {not_loaded, ErrReason} ->
 
1969
                    Done#loader_done{is_loaded = false,
 
1970
                                     reply = {not_loaded,ErrReason}}
 
1971
            end;
 
1972
        true ->
 
1973
            %% Already readable, do not worry be happy
 
1974
            Done
 
1975
    end.
 
1976
 
 
1977
disc_load_table(Tab, Reason, ReplyTo) ->
 
1978
    Done = #loader_done{is_loaded = true,
 
1979
                        table_name = Tab,
 
1980
                        needs_announce = false,
 
1981
                        needs_sync = false,
 
1982
                        needs_reply = true,
 
1983
                        reply_to = ReplyTo,
 
1984
                        reply = {loaded, ok}
 
1985
                       },
 
1986
    Res = mnesia_loader:disc_load_table(Tab, Reason),
 
1987
    if
 
1988
        Res == {loaded, ok} ->
 
1989
            Done#loader_done{needs_announce = true,
 
1990
                             needs_sync = true,
 
1991
                             reply = Res};
 
1992
        ReplyTo /= undefined ->
 
1993
            Done#loader_done{is_loaded = false,
 
1994
                             reply = Res};
 
1995
        true ->
 
1996
            fatal("Cannot load table ~p from disc: ~p~n", [Tab, Res])
 
1997
    end.
 
1998
 
 
1999
filter_active(Tab) ->
 
2000
    ByForce = val({Tab, load_by_force}),
 
2001
    Active = val({Tab, active_replicas}),
 
2002
    Masters = mnesia_recover:get_master_nodes(Tab),
 
2003
    do_filter_active(ByForce, Active, Masters).
 
2004
 
 
2005
do_filter_active(true, Active, _Masters) ->
 
2006
    Active;
 
2007
do_filter_active(false, Active, []) ->
 
2008
    Active;
 
2009
do_filter_active(false, Active, Masters) ->
 
2010
    mnesia_lib:intersect(Active, Masters).
 
2011
    
 
2012