1
%% ``The contents of this file are subject to the Erlang Public License,
2
%% Version 1.1, (the "License"); you may not use this file except in
3
%% compliance with the License. You should have received a copy of the
4
%% Erlang Public License along with this software. If not, it can be
5
%% retrieved via the world wide web at http://www.erlang.org/.
7
%% Software distributed under the License is distributed on an "AS IS"
8
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
9
%% the License for the specific language governing rights and limitations
12
%% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
13
%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
14
%% AB. All Rights Reserved.''
16
%% $Id: mnesia_checkpoint.erl,v 1.1 2008/12/17 09:53:38 mikpe Exp $
18
-module(mnesia_checkpoint).
20
%% TM callback interface
23
tm_change_table_copy_type/3,
58
%% sys callback interface
65
-include("mnesia.hrl").
66
-import(mnesia_lib, [add/2, del/2, set/2, unset/1]).
67
-import(mnesia_lib, [dbg_out/2]).
69
-record(tm, {log, pending, transactions, checkpoints}).
71
-record(checkpoint_args, {name = {now(), node()},
73
ram_overrides_dump = false,
77
cookie = ?unique_cookie,
81
wait_for_old, % Initially undefined then List
90
%% Old record definition
91
-record(checkpoint, {name,
109
-record(retainer, {cp_name, tab_name, store, writers = [], really_retain = true}).
111
-record(iter, {tab_name, oid_tab, main_tab, retainer_tab, source, val, pid}).
113
-record(pending, {tid, disc_nodes = [], ram_nodes = []}).
116
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
117
%% TM callback functions
120
lists:foreach(fun(Name) -> call(Name, stop) end,
124
tm_prepare(Cp) when record(Cp, checkpoint_args) ->
125
Name = Cp#checkpoint_args.name,
126
case lists:member(Name, checkpoints()) of
130
{error, {already_exists, Name, node()}}
132
tm_prepare(Cp) when record(Cp, checkpoint) ->
133
%% Node with old protocol sent an old checkpoint record
134
%% and we have to convert it
135
case convert_cp_record(Cp) of
142
tm_mnesia_down(Node) ->
143
lists:foreach(fun(Name) -> cast(Name, {mnesia_down, Node}) end,
147
tm_enter_pending(Tid, DiscNs, RamNs) ->
148
Pending = #pending{tid = Tid, disc_nodes = DiscNs, ram_nodes = RamNs},
149
tm_enter_pending(Pending).
151
tm_enter_pending(Pending) ->
152
PendingTabs = val(pending_checkpoints),
153
tm_enter_pending(PendingTabs, Pending).
155
tm_enter_pending([], Pending) ->
157
tm_enter_pending([Tab | Tabs], Pending) ->
158
catch ?ets_insert(Tab, Pending),
159
tm_enter_pending(Tabs, Pending).
161
tm_exit_pending(Tid) ->
162
Pids = val(pending_checkpoint_pids),
163
tm_exit_pending(Pids, Tid).
165
tm_exit_pending([], Tid) ->
167
tm_exit_pending([Pid | Pids], Tid) ->
168
Pid ! {self(), {exit_pending, Tid}},
169
tm_exit_pending(Pids, Tid).
171
enter_still_pending([Tid | Tids], Tab) ->
172
?ets_insert(Tab, #pending{tid = Tid}),
173
enter_still_pending(Tids, Tab);
174
enter_still_pending([], _Tab) ->
178
%% Looks up checkpoints for functions in mnesia_tm.
179
tm_retain(Tid, Tab, Key, Op) ->
180
case val({Tab, commit_work}) of
181
[{checkpoints, Checkpoints} | _ ] ->
182
tm_retain(Tid, Tab, Key, Op, Checkpoints);
187
tm_retain(Tid, Tab, Key, Op, Checkpoints) ->
190
OldRecs = mnesia_lib:db_match_object(Tab, '_'),
191
send_group_retain(OldRecs, Checkpoints, Tid, Tab, []),
194
OldRecs = mnesia_lib:db_get(Tab, Key),
195
send_retain(Checkpoints, {retain, Tid, Tab, Key, OldRecs}),
199
send_group_retain([Rec | Recs], Checkpoints, Tid, Tab, [PrevRec | PrevRecs])
200
when element(2, Rec) /= element(2, PrevRec) ->
201
Key = element(2, PrevRec),
202
OldRecs = lists:reverse([PrevRec | PrevRecs]),
203
send_retain(Checkpoints, {retain, Tid, Tab, Key, OldRecs}),
204
send_group_retain(Recs, Checkpoints, Tid, Tab, [Rec]);
205
send_group_retain([Rec | Recs], Checkpoints, Tid, Tab, Acc) ->
206
send_group_retain(Recs, Checkpoints, Tid, Tab, [Rec | Acc]);
207
send_group_retain([], Checkpoints, Tid, Tab, [PrevRec | PrevRecs]) ->
208
Key = element(2, PrevRec),
209
OldRecs = lists:reverse([PrevRec | PrevRecs]),
210
send_retain(Checkpoints, {retain, Tid, Tab, Key, OldRecs}),
212
send_group_retain([], _Checkpoints, _Tid, _Tab, []) ->
215
send_retain([Name | Names], Msg) ->
217
send_retain(Names, Msg);
218
send_retain([], _Msg) ->
221
tm_add_copy(Tab, Node) when Node /= node() ->
222
case val({Tab, commit_work}) of
223
[{checkpoints, Checkpoints} | _ ] ->
224
Fun = fun(Name) -> call(Name, {add_copy, Tab, Node}) end,
225
map_call(Fun, Checkpoints, ok);
230
tm_del_copy(Tab, Node) when Node == node() ->
231
mnesia_subscr:unsubscribe_table(Tab),
232
case val({Tab, commit_work}) of
233
[{checkpoints, Checkpoints} | _ ] ->
234
Fun = fun(Name) -> call(Name, {del_copy, Tab, Node}) end,
235
map_call(Fun, Checkpoints, ok);
240
tm_change_table_copy_type(Tab, From, To) ->
241
case val({Tab, commit_work}) of
242
[{checkpoints, Checkpoints} | _ ] ->
243
Fun = fun(Name) -> call(Name, {change_copy, Tab, From, To}) end,
244
map_call(Fun, Checkpoints, ok);
249
map_call(Fun, [Name | Names], Res) ->
252
map_call(Fun, Names, Res);
253
{error, {no_exists, Name}} ->
254
map_call(Fun, Names, Res);
256
%% BUGBUG: We may end up with some checkpoint retainers
257
%% too much in the add_copy case. How do we remove them?
258
map_call(Fun, Names, {error, Reason})
260
map_call(_Fun, [], Res) ->
263
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
267
case call(Name, get_checkpoint) of
271
deactivate(Cp#checkpoint_args.nodes, Name)
274
deactivate(Nodes, Name) ->
275
rpc:multicall(Nodes, ?MODULE, remote_deactivate, [Name]),
278
remote_deactivate(Name) ->
279
call(Name, deactivate).
281
checkpoints() -> val(checkpoints).
283
tables_and_cookie(Name) ->
284
case call(Name, get_checkpoint) of
288
Tabs = Cp#checkpoint_args.min ++ Cp#checkpoint_args.max,
289
Cookie = Cp#checkpoint_args.cookie,
293
most_local_node(Name, Tab) ->
294
case ?catch_val({Tab, {retainer, Name}}) of
296
{error, {"No retainer attached to table", [Tab, Name]}};
298
Writers = R#retainer.writers,
299
LocalWriter = lists:member(node(), Writers),
301
LocalWriter == true ->
306
{error, {"No retainer attached to table", [Tab, Name]}}
310
really_retain(Name, Tab) ->
311
R = val({Tab, {retainer, Name}}),
312
R#retainer.really_retain.
314
%% Activate a checkpoint.
316
%% A checkpoint is a transaction consistent state that may be used to
317
%% perform a distributed backup or to rollback the involved tables to
318
%% their old state. Backups may also be used to restore tables to
319
%% their old state. Args is a list of the following tuples:
322
%% Name of checkpoint. Each checkpoint must have a name which
323
%% is unique on the reachable nodes. The name may be reused when
324
%% the checkpoint has been deactivated.
325
%% By default a probably unique name is generated.
326
%% Multiple checkpoints may be set on the same table.
328
%% {allow_remote, Bool}
329
%% false means that all retainers must be local. If the
330
%% table does not reside locally, the checkpoint fails.
331
%% true allows retainers on other nodes.
334
%% Minimize redundancy and only keep checkpoint info together with
335
%% one replica, preferrably at the local node. If any node involved
336
%% the checkpoint goes down, the checkpoint is deactivated.
339
%% Maximize redundancy and keep checkpoint info together with all
340
%% replicas. The checkpoint becomes more fault tolerant if the
341
%% tables has several replicas. When new replicas are added, they
342
%% will also get a retainer attached to them.
344
%% {ram_overrides_dump, Bool}
345
%% {ram_overrides_dump, Tabs}
346
%% Only applicable for ram_copies. Bool controls which versions of
347
%% the records that should be included in the checkpoint state.
348
%% true means that the latest comitted records in ram (i.e. the
349
%% records that the application accesses) should be included
350
%% in the checkpoint. false means that the records dumped to
351
%% dat-files (the records that will be loaded at startup) should
352
%% be included in the checkpoint. Tabs is a list of tables.
355
%% {ignore_new, TidList}
356
%% Normally we wait for all pending transactions to complete
357
%% before we allow iteration over the checkpoint. But in order
358
%% to cope with checkpoint activation inside a transaction that
359
%% currently prepares commit (mnesia_init:get_net_work_copy) we
360
%% need to have the ability to ignore the enclosing transaction.
361
%% We do not wait for the transactions in TidList to end. The
362
%% transactions in TidList are regarded as newer than the checkpoint.
365
case args2cp(Args) of
372
args2cp(Args) when list(Args)->
373
case catch lists:foldl(fun check_arg/2, #checkpoint_args{}, Args) of
377
case check_tables(Cp) of
380
{ok, Overriders, AllTabs} ->
381
arrange_retainers(Cp, Overriders, AllTabs)
385
{error, {badarg, Args}}.
387
check_arg({name, Name}, Cp) ->
388
case lists:member(Name, checkpoints()) of
390
exit({already_exists, Name});
392
case catch tab2retainer({foo, Name}) of
393
List when list(List) ->
394
Cp#checkpoint_args{name = Name};
399
check_arg({allow_remote, true}, Cp) ->
400
Cp#checkpoint_args{allow_remote = true};
401
check_arg({allow_remote, false}, Cp) ->
402
Cp#checkpoint_args{allow_remote = false};
403
check_arg({ram_overrides_dump, true}, Cp) ->
404
Cp#checkpoint_args{ram_overrides_dump = true};
405
check_arg({ram_overrides_dump, false}, Cp) ->
406
Cp#checkpoint_args{ram_overrides_dump = false};
407
check_arg({ram_overrides_dump, Tabs}, Cp) when list(Tabs) ->
408
Cp#checkpoint_args{ram_overrides_dump = Tabs};
409
check_arg({min, Tabs}, Cp) when list(Tabs) ->
410
Cp#checkpoint_args{min = Tabs};
411
check_arg({max, Tabs}, Cp) when list(Tabs) ->
412
Cp#checkpoint_args{max = Tabs};
413
check_arg({ignore_new, Tids}, Cp) when list(Tids) ->
414
Cp#checkpoint_args{ignore_new = Tids};
419
Min = Cp#checkpoint_args.min,
420
Max = Cp#checkpoint_args.max,
421
AllTabs = Min ++ Max,
422
DoubleTabs = [T || T <- Min, lists:member(T, Max)],
423
Overriders = Cp#checkpoint_args.ram_overrides_dump,
426
{error, {combine_error, Cp#checkpoint_args.name,
427
[{min, DoubleTabs}, {max, DoubleTabs}]}};
428
Min == [], Max == [] ->
429
{error, {combine_error, Cp#checkpoint_args.name,
430
[{min, Min}, {max, Max}]}};
431
Overriders == false ->
433
Overriders == true ->
434
{ok, AllTabs, AllTabs};
436
case [T || T <- Overriders, not lists:member(T, Min)] of
438
case [T || T <- Overriders, not lists:member(T, Max)] of
440
{ok, Overriders, AllTabs};
442
{error, {combine_error, Cp#checkpoint_args.name,
443
[{ram_overrides_dump, Outsiders},
447
{error, {combine_error, Cp#checkpoint_args.name,
448
[{ram_overrides_dump, Outsiders},
453
arrange_retainers(Cp, Overriders, AllTabs) ->
454
R = #retainer{cp_name = Cp#checkpoint_args.name},
455
case catch [R#retainer{tab_name = Tab,
456
writers = select_writers(Cp, Tab)}
457
|| Tab <- AllTabs] of
461
{ok, Cp#checkpoint_args{ram_overrides_dump = Overriders,
462
retainers = Retainers,
463
nodes = writers(Retainers)}}
466
select_writers(Cp, Tab) ->
467
case filter_remote(Cp, val({Tab, active_replicas})) of
469
exit({"Cannot prepare checkpoint (replica not available)",
470
[Tab, Cp#checkpoint_args.name]});
473
case {lists:member(Tab, Cp#checkpoint_args.max),
474
lists:member(This, Writers)} of
475
{true, _} -> Writers; % Max
476
{false, true} -> [This];
477
{false, false} -> [hd(Writers)]
481
filter_remote(Cp, Writers) when Cp#checkpoint_args.allow_remote == true ->
483
filter_remote(_Cp, Writers) ->
485
case lists:member(This, Writers) of
490
writers(Retainers) ->
491
Fun = fun(R, Acc) -> R#retainer.writers ++ Acc end,
492
Writers = lists:foldl(Fun, [], Retainers),
493
mnesia_lib:uniq(Writers).
496
Name = Cp#checkpoint_args.name,
497
Nodes = Cp#checkpoint_args.nodes,
498
case mnesia_tm:prepare_checkpoint(Nodes, Cp) of
500
check_prep(Replies, Name, Nodes, Cp#checkpoint_args.ignore_new);
502
{error, {"Cannot prepare checkpoint (bad nodes)",
506
check_prep([{ok, Name, IgnoreNew, _Node} | Replies], Name, Nodes, IgnoreNew) ->
507
check_prep(Replies, Name, Nodes, IgnoreNew);
508
check_prep([{error, Reason} | _Replies], Name, _Nodes, _IgnoreNew) ->
509
{error, {"Cannot prepare checkpoint (bad reply)",
511
check_prep([{badrpc, Reason} | _Replies], Name, _Nodes, _IgnoreNew) ->
512
{error, {"Cannot prepare checkpoint (badrpc)",
514
check_prep([], Name, Nodes, IgnoreNew) ->
515
collect_pending(Name, Nodes, IgnoreNew).
517
collect_pending(Name, Nodes, IgnoreNew) ->
518
case rpc:multicall(Nodes, ?MODULE, call, [Name, collect_pending]) of
520
case catch ?ets_new_table(mnesia_union, [bag]) of
521
{'EXIT', Reason} -> %% system limit
522
Msg = "Cannot create an ets table pending union",
523
{error, {system_limit, Msg, Reason}};
525
compute_union(Replies, Nodes, Name, UnionTab, IgnoreNew)
528
deactivate(Nodes, Name),
529
{error, {"Cannot collect from pending checkpoint", Name, BadNodes}}
532
compute_union([{ok, Pending} | Replies], Nodes, Name, UnionTab, IgnoreNew) ->
533
add_pending(Pending, UnionTab),
534
compute_union(Replies, Nodes, Name, UnionTab, IgnoreNew);
535
compute_union([{error, Reason} | _Replies], Nodes, Name, UnionTab, _IgnoreNew) ->
536
deactivate(Nodes, Name),
537
?ets_delete_table(UnionTab),
539
compute_union([{badrpc, Reason} | _Replies], Nodes, Name, UnionTab, _IgnoreNew) ->
540
deactivate(Nodes, Name),
541
?ets_delete_table(UnionTab),
542
{error, {badrpc, Reason}};
543
compute_union([], Nodes, Name, UnionTab, IgnoreNew) ->
544
send_activate(Nodes, Nodes, Name, UnionTab, IgnoreNew).
546
add_pending([P | Pending], UnionTab) ->
547
add_pending_node(P#pending.disc_nodes, P#pending.tid, UnionTab),
548
add_pending_node(P#pending.ram_nodes, P#pending.tid, UnionTab),
549
add_pending(Pending, UnionTab);
550
add_pending([], _UnionTab) ->
553
add_pending_node([Node | Nodes], Tid, UnionTab) ->
554
?ets_insert(UnionTab, {Node, Tid}),
555
add_pending_node(Nodes, Tid, UnionTab);
556
add_pending_node([], _Tid, _UnionTab) ->
559
send_activate([Node | Nodes], AllNodes, Name, UnionTab, IgnoreNew) ->
560
Pending = [Tid || {_, Tid} <- ?ets_lookup(UnionTab, Node),
561
not lists:member(Tid, IgnoreNew)],
562
case rpc:call(Node, ?MODULE, call, [Name, {activate, Pending}]) of
564
send_activate(Nodes, AllNodes, Name, UnionTab, IgnoreNew);
566
deactivate(Nodes, Name),
567
?ets_delete_table(UnionTab),
568
{error, {"Activation failed (bad node)", Name, Node, Reason}};
570
deactivate(Nodes, Name),
571
?ets_delete_table(UnionTab),
572
{error, {"Activation failed", Name, Node, Reason}}
574
send_activate([], AllNodes, Name, UnionTab, _IgnoreNew) ->
575
?ets_delete_table(UnionTab),
576
{ok, Name, AllNodes}.
578
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
582
case ?catch_val({checkpoint, Name}) of
584
{error, {no_exists, Name}};
592
case cast(Name, Msg) of
594
catch link(Pid), % Always local
597
{'EXIT', Pid, Reason} ->
598
{error, {"Got exit", [Name, Reason]}};
599
{Name, Self, Reply} ->
607
abcast(Nodes, Name, Msg) ->
608
rpc:eval_everywhere(Nodes, ?MODULE, cast, [Name, Msg]).
610
reply(nopid, _Name, _Reply) ->
612
reply(ReplyTo, Name, Reply) ->
613
ReplyTo ! {Name, ReplyTo, Reply}.
615
%% Returns {ok, NewCp} or {error, Reason}
616
start_retainer(Cp) ->
617
% Will never be restarted
618
Name = Cp#checkpoint_args.name,
619
case supervisor:start_child(mnesia_checkpoint_sup, [Cp]) of
621
{ok, Name, Cp#checkpoint_args.ignore_new, node()};
623
{error, {"Cannot create checkpoint retainer",
624
Name, node(), Reason}}
628
Name = Cp#checkpoint_args.name,
629
Args = [Cp#checkpoint_args{supervisor = self()}],
630
mnesia_monitor:start_proc({?MODULE, Name}, ?MODULE, init, Args).
633
process_flag(trap_exit, true),
634
Name = Cp#checkpoint_args.name,
635
Props = [set, public, {keypos, 2}],
636
case catch ?ets_new_table(mnesia_pending_checkpoint, Props) of
637
{'EXIT', Reason} -> %% system limit
638
Msg = "Cannot create an ets table for pending transactions",
639
Error = {error, {system_limit, Name, Msg, Reason}},
640
proc_lib:init_ack(Cp#checkpoint_args.supervisor, Error);
642
Rs = [prepare_tab(Cp, R) || R <- Cp#checkpoint_args.retainers],
643
Cp2 = Cp#checkpoint_args{retainers = Rs,
645
pending_tab = PendingTab},
646
add(pending_checkpoint_pids, self()),
647
add(pending_checkpoints, PendingTab),
648
set({checkpoint, Name}, self()),
649
add(checkpoints, Name),
650
dbg_out("Checkpoint ~p (~p) started~n", [Name, self()]),
651
proc_lib:init_ack(Cp2#checkpoint_args.supervisor, {ok, self()}),
655
prepare_tab(Cp, R) ->
656
Tab = R#retainer.tab_name,
657
prepare_tab(Cp, R, val({Tab, storage_type})).
659
prepare_tab(Cp, R, Storage) ->
660
Tab = R#retainer.tab_name,
661
Name = R#retainer.cp_name,
662
case lists:member(node(), R#retainer.writers) of
664
R2 = retainer_create(Cp, R, Tab, Name, Storage),
665
set({Tab, {retainer, Name}}, R2),
666
add({Tab, checkpoints}, Name), %% Keep checkpoint info for table_info & mnesia_session
667
add_chkp_info(Tab, Name),
670
set({Tab, {retainer, Name}}, R#retainer{store = undefined}),
674
add_chkp_info(Tab, Name) ->
675
case val({Tab, commit_work}) of
676
[{checkpoints, OldList} | CommitList] ->
677
case lists:member(Name, OldList) of
681
NewC = [{checkpoints, [Name | OldList]} | CommitList],
682
mnesia_lib:set({Tab, commit_work}, NewC)
685
Chkp = {checkpoints, [Name]},
686
%% OBS checkpoints needs to be first in the list!
687
mnesia_lib:set({Tab, commit_work}, [Chkp | CommitList])
690
tab2retainer({Tab, Name}) ->
691
FlatName = lists:flatten(io_lib:write(Name)),
692
mnesia_lib:dir(lists:concat([?MODULE, "_", Tab, "_", FlatName, ".RET"])).
694
retainer_create(_Cp, R, Tab, Name, disc_only_copies) ->
695
Fname = tab2retainer({Tab, Name}),
697
Args = [{file, Fname}, {type, set}, {keypos, 2}, {repair, false}],
698
{ok, _} = mnesia_lib:dets_sync_open({Tab, Name}, Args),
699
dbg_out("Checkpoint retainer created ~p ~p~n", [Name, Tab]),
700
R#retainer{store = {dets, {Tab, Name}}, really_retain = true};
701
retainer_create(Cp, R, Tab, Name, Storage) ->
702
T = ?ets_new_table(mnesia_retainer, [set, public, {keypos, 2}]),
703
Overriders = Cp#checkpoint_args.ram_overrides_dump,
704
ReallyR = R#retainer.really_retain,
705
ReallyCp = lists:member(Tab, Overriders),
706
ReallyR2 = prepare_ram_tab(Tab, T, Storage, ReallyR, ReallyCp),
707
dbg_out("Checkpoint retainer created ~p ~p~n", [Name, Tab]),
708
R#retainer{store = {ets, T}, really_retain = ReallyR2}.
710
%% Copy the dumped table into retainer if needed
711
%% If the really_retain flag already has been set to false,
712
%% it should remain false even if we change storage type
713
%% while the checkpoint is activated.
714
prepare_ram_tab(Tab, T, ram_copies, true, false) ->
715
Fname = mnesia_lib:tab2dcd(Tab),
716
case mnesia_lib:exists(Fname) of
718
Log = mnesia_log:open_log(prepare_ram_tab,
719
mnesia_log:dcd_log_header(),
721
mnesia_monitor:get_env(auto_repair),
724
Key = element(2, Rec),
726
case ?ets_lookup(T, Key) of
730
?ets_insert(T, {Tab, Key, [Rec | Recs]}),
733
traverse_dcd(mnesia_log:chunk_log(Log, start), Log, Add),
734
mnesia_log:close_log(Log);
739
prepare_ram_tab(_, _, _, ReallyRetain, _) ->
742
traverse_dcd({Cont, [LogH | Rest]}, Log, Fun)
743
when record(LogH, log_header),
744
LogH#log_header.log_kind == dcd_log,
745
LogH#log_header.log_version >= "1.0" ->
746
traverse_dcd({Cont, Rest}, Log, Fun); %% BUGBUG Error handling repaired files
747
traverse_dcd({Cont, Recs}, Log, Fun) -> %% trashed data??
748
lists:foreach(Fun, Recs),
749
traverse_dcd(mnesia_log:chunk_log(Log, Cont), Log, Fun);
750
traverse_dcd(eof, _Log, _Fun) ->
753
retainer_get({ets, Store}, Key) -> ?ets_lookup(Store, Key);
754
retainer_get({dets, Store}, Key) -> dets:lookup(Store, Key).
756
retainer_put({ets, Store}, Val) -> ?ets_insert(Store, Val);
757
retainer_put({dets, Store}, Val) -> dets:insert(Store, Val).
759
retainer_first({ets, Store}) -> ?ets_first(Store);
760
retainer_first({dets, Store}) -> dets:first(Store).
762
retainer_next({ets, Store}, Key) -> ?ets_next(Store, Key);
763
retainer_next({dets, Store}, Key) -> dets:next(Store, Key).
765
%% retainer_next_slot(Tab, Pos) ->
766
%% case retainer_slot(Tab, Pos) of
767
%% '$end_of_table' ->
770
%% retainer_next_slot(Tab, Pos + 1);
771
%% Recs when list(Recs) ->
775
%% retainer_slot({ets, Store}, Pos) -> ?ets_next(Store, Pos);
776
%% retainer_slot({dets, Store}, Pos) -> dets:slot(Store, Pos).
778
retainer_fixtable(Tab, Bool) when atom(Tab) ->
779
mnesia_lib:db_fixtable(val({Tab, storage_type}), Tab, Bool);
780
retainer_fixtable({ets, Tab}, Bool) ->
781
mnesia_lib:db_fixtable(ram_copies, Tab, Bool);
782
retainer_fixtable({dets, Tab}, Bool) ->
783
mnesia_lib:db_fixtable(disc_only_copies, Tab, Bool).
785
retainer_delete({ets, Store}) ->
786
?ets_delete_table(Store);
787
retainer_delete({dets, Store}) ->
788
mnesia_lib:dets_sync_close(Store),
789
Fname = tab2retainer(Store),
793
Name = Cp#checkpoint_args.name,
795
{_From, {retain, Tid, Tab, Key, OldRecs}}
796
when Cp#checkpoint_args.wait_for_old == [] ->
797
R = val({Tab, {retainer, Name}}),
798
case R#retainer.really_retain of
800
PendingTab = Cp#checkpoint_args.pending_tab,
801
case catch ?ets_lookup_element(PendingTab, Tid, 1) of
803
Store = R#retainer.store,
804
case retainer_get(Store, Key) of
806
retainer_put(Store, {Tab, Key, OldRecs});
819
{From, deactivate} ->
821
reply(From, Name, deactivated),
825
{'EXIT', Parent, _} when Parent == Cp#checkpoint_args.supervisor ->
827
%% assume that entire Mnesia is terminating
830
{_From, {mnesia_down, Node}} ->
831
Cp2 = do_del_retainers(Cp, Node),
833
{From, get_checkpoint} ->
834
reply(From, Name, Cp),
836
{From, {add_copy, Tab, Node}} when Cp#checkpoint_args.wait_for_old == [] ->
837
{Res, Cp2} = do_add_copy(Cp, Tab, Node),
838
reply(From, Name, Res),
840
{From, {del_copy, Tab, Node}} when Cp#checkpoint_args.wait_for_old == [] ->
841
Cp2 = do_del_copy(Cp, Tab, Node),
842
reply(From, Name, ok),
844
{From, {change_copy, Tab, From, To}} when Cp#checkpoint_args.wait_for_old == [] ->
845
Cp2 = do_change_copy(Cp, Tab, From, To),
846
reply(From, Name, ok),
848
{_From, {add_retainer, R, Node}} ->
849
Cp2 = do_add_retainer(Cp, R, Node),
851
{_From, {del_retainer, R, Node}} when Cp#checkpoint_args.wait_for_old == [] ->
852
Cp2 = do_del_retainer(Cp, R, Node),
856
{From, {iter_begin, Iter}} when Cp#checkpoint_args.wait_for_old == [] ->
857
Cp2 = iter_begin(Cp, From, Iter),
860
{From, {iter_end, Iter}} when Cp#checkpoint_args.wait_for_old == [] ->
861
retainer_fixtable(Iter#iter.oid_tab, false),
862
Iters = Cp#checkpoint_args.iterators -- [Iter],
863
reply(From, Name, ok),
864
retainer_loop(Cp#checkpoint_args{iterators = Iters});
866
{_From, {exit_pending, Tid}}
867
when list(Cp#checkpoint_args.wait_for_old) ->
868
StillPending = lists:delete(Tid, Cp#checkpoint_args.wait_for_old),
869
Cp2 = Cp#checkpoint_args{wait_for_old = StillPending},
870
Cp3 = maybe_activate(Cp2),
873
{From, collect_pending} ->
874
PendingTab = Cp#checkpoint_args.pending_tab,
875
del(pending_checkpoints, PendingTab),
876
Pending = ?ets_match_object(PendingTab, '_'),
877
reply(From, Name, {ok, Pending}),
880
{From, {activate, Pending}} ->
881
StillPending = mnesia_recover:still_pending(Pending),
882
enter_still_pending(StillPending, Cp#checkpoint_args.pending_tab),
883
Cp2 = maybe_activate(Cp#checkpoint_args{wait_for_old = StillPending}),
884
reply(From, Name, activated),
887
{'EXIT', From, _Reason} ->
888
Iters = [Iter || Iter <- Cp#checkpoint_args.iterators,
889
check_iter(From, Iter)],
890
retainer_loop(Cp#checkpoint_args{iterators = Iters});
892
{system, From, Msg} ->
893
dbg_out("~p got {system, ~p, ~p}~n", [?MODULE, From, Msg]),
894
sys:handle_system_msg(Msg, From, no_parent, ?MODULE, [], Cp)
898
when Cp#checkpoint_args.wait_for_old == [],
899
Cp#checkpoint_args.is_activated == false ->
900
Cp#checkpoint_args{pending_tab = undefined, is_activated = true};
901
maybe_activate(Cp) ->
904
iter_begin(Cp, From, Iter) ->
905
Name = Cp#checkpoint_args.name,
906
R = val({Iter#iter.tab_name, {retainer, Name}}),
907
Iter2 = init_tabs(R, Iter),
908
Iter3 = Iter2#iter{pid = From},
909
retainer_fixtable(Iter3#iter.oid_tab, true),
910
Iters = [Iter3 | Cp#checkpoint_args.iterators],
911
reply(From, Name, {ok, Iter3, self()}),
912
Cp#checkpoint_args{iterators = Iters}.
915
Name = Cp#checkpoint_args.name,
916
del(pending_checkpoints, Cp#checkpoint_args.pending_tab),
917
del(pending_checkpoint_pids, self()),
918
del(checkpoints, Name),
919
unset({checkpoint, Name}),
920
lists:foreach(fun deactivate_tab/1, Cp#checkpoint_args.retainers),
921
Iters = Cp#checkpoint_args.iterators,
922
lists:foreach(fun(I) -> retainer_fixtable(I#iter.oid_tab, false) end, Iters).
925
Name = R#retainer.cp_name,
926
Tab = R#retainer.tab_name,
927
del({Tab, checkpoints}, Name), %% Keep checkpoint info for table_info & mnesia_session
928
del_chkp_info(Tab, Name),
929
unset({Tab, {retainer, Name}}),
930
Active = lists:member(node(), R#retainer.writers),
931
case R#retainer.store of
934
Store when Active == true ->
935
retainer_delete(Store);
940
del_chkp_info(Tab, Name) ->
941
case val({Tab, commit_work}) of
942
[{checkpoints, ChkList} | Rest] ->
943
case lists:delete(Name, ChkList) of
945
%% The only checkpoint was deleted
946
mnesia_lib:set({Tab, commit_work}, Rest);
948
mnesia_lib:set({Tab, commit_work},
949
[{checkpoints, NewList} | Rest])
954
do_del_retainers(Cp, Node) ->
955
Rs = [do_del_retainer2(Cp, R, Node) || R <- Cp#checkpoint_args.retainers],
956
Cp#checkpoint_args{retainers = Rs, nodes = writers(Rs)}.
958
do_del_retainer2(Cp, R, Node) ->
959
Writers = R#retainer.writers -- [Node],
960
R2 = R#retainer{writers = Writers},
961
set({R2#retainer.tab_name, {retainer, R2#retainer.cp_name}}, R2),
964
Event = {mnesia_checkpoint_deactivated, Cp#checkpoint_args.name},
965
mnesia_lib:report_system_event(Event),
969
deactivate_tab(R), % Avoids unnecessary tm_retain accesses
970
set({R2#retainer.tab_name, {retainer, R2#retainer.cp_name}}, R2),
976
do_del_retainer(Cp, R0, Node) ->
977
{R, Rest} = find_retainer(R0, Cp#checkpoint_args.retainers, []),
978
R2 = do_del_retainer2(Cp, R, Node),
980
Cp#checkpoint_args{retainers = Rs, nodes = writers(Rs)}.
982
do_del_copy(Cp, Tab, ThisNode) when ThisNode == node() ->
983
Name = Cp#checkpoint_args.name,
984
Others = Cp#checkpoint_args.nodes -- [ThisNode],
985
R = val({Tab, {retainer, Name}}),
986
abcast(Others, Name, {del_retainer, R, ThisNode}),
987
do_del_retainer(Cp, R, ThisNode).
989
do_add_copy(Cp, Tab, Node) when Node /= node()->
990
case lists:member(Tab, Cp#checkpoint_args.max) of
994
Name = Cp#checkpoint_args.name,
995
R0 = val({Tab, {retainer, Name}}),
996
W = R0#retainer.writers,
997
R = R0#retainer{writers = W ++ [Node]},
999
case lists:member(Node, Cp#checkpoint_args.nodes) of
1001
send_retainer(Cp, R, Node);
1003
case tm_remote_prepare(Node, Cp) of
1004
{ok, Name, _IgnoreNew, Node} ->
1005
case lists:member(schema, Cp#checkpoint_args.max) of
1007
%% We need to send schema retainer somewhere
1008
RS0 = val({schema, {retainer, Name}}),
1009
W = RS0#retainer.writers,
1010
RS1 = RS0#retainer{writers = W ++ [Node]},
1011
case send_retainer(Cp, RS1, Node) of
1013
send_retainer(Cp1, R, Node);
1018
send_retainer(Cp, R, Node)
1021
{{error, {badrpc, Reason}}, Cp};
1023
{{error, Reason}, Cp}
1028
tm_remote_prepare(Node, Cp) ->
1029
rpc:call(Node, ?MODULE, tm_prepare, [Cp]).
1031
do_add_retainer(Cp, R0, Node) ->
1032
Writers = R0#retainer.writers,
1033
{R, Rest} = find_retainer(R0, Cp#checkpoint_args.retainers, []),
1037
prepare_tab(Cp, R#retainer{writers = Writers});
1039
R#retainer{writers = Writers}
1041
Rs = [NewRet | Rest],
1042
set({NewRet#retainer.tab_name, {retainer, NewRet#retainer.cp_name}}, NewRet),
1043
Cp#checkpoint_args{retainers = Rs, nodes = writers(Rs)}.
1045
find_retainer(#retainer{cp_name = CP, tab_name = Tab},
1046
[Ret = #retainer{cp_name = CP, tab_name = Tab} | R], Acc) ->
1048
find_retainer(Ret, [H|R], Acc) ->
1049
find_retainer(Ret, R, [H|Acc]).
1051
send_retainer(Cp, R, Node) ->
1052
Name = Cp#checkpoint_args.name,
1053
Nodes0 = Cp#checkpoint_args.nodes -- [Node],
1054
Nodes1 = Nodes0 ++ [Node],
1055
Nodes = Nodes1 -- [node()],
1056
abcast(Nodes, Name, {add_retainer, R, Node}),
1057
Store = R#retainer.store,
1058
%% send_retainer2(Node, Name, Store, retainer_next_slot(Store, 0)),
1059
send_retainer2(Node, Name, Store, retainer_first(Store)),
1060
Cp2 = do_add_retainer(Cp, R, Node),
1063
send_retainer2(_, _, _, '$end_of_table') ->
1065
%%send_retainer2(Node, Name, Store, {Slot, Records}) ->
1066
send_retainer2(Node, Name, Store, Key) ->
1067
[{Tab, _, Records}] = retainer_get(Store, Key),
1068
abcast([Node], Name, {retain, {dirty, send_retainer}, Tab, Key, Records}),
1069
send_retainer2(Node, Name, Store, retainer_next(Store, Key)).
1071
do_change_copy(Cp, Tab, FromType, ToType) ->
1072
Name = Cp#checkpoint_args.name,
1073
R = val({Tab, {retainer, Name}}),
1074
R2 = prepare_tab(Cp, R, ToType),
1075
{_, Old} = R#retainer.store,
1076
{_, New} = R2#retainer.store,
1078
Fname = tab2retainer({Tab, Name}),
1080
FromType == disc_only_copies ->
1081
mnesia_lib:dets_sync_close(Old),
1082
loaded = mnesia_lib:dets_to_ets(Old, New, Fname, set, no, yes),
1083
ok = file:delete(Fname);
1084
ToType == disc_only_copies ->
1085
TabSize = ?ets_info(Old, size),
1086
Props = [{file, Fname},
1089
%% {ram_file, true},
1090
{estimated_no_objects, TabSize + 256},
1092
{ok, _} = mnesia_lib:dets_sync_open(New, Props),
1093
ok = mnesia_dumper:raw_dump_table(New, Old),
1094
?ets_delete_table(Old);
1098
Pos = #retainer.tab_name,
1099
Rs = lists:keyreplace(Tab, Pos, Cp#checkpoint_args.retainers, R2),
1100
Cp#checkpoint_args{retainers = Rs, nodes = writers(Rs)}.
1102
check_iter(From, Iter) when Iter#iter.pid == From ->
1103
retainer_fixtable(Iter#iter.oid_tab, false),
1105
check_iter(_From, _Iter) ->
1108
init_tabs(R, Iter) ->
1109
{Kind, _} = Store = R#retainer.store,
1110
Main = {Kind, Iter#iter.tab_name},
1112
Iter2 = Iter#iter{main_tab = Main, retainer_tab = Ret},
1113
case Iter#iter.source of
1114
table -> Iter2#iter{oid_tab = Main};
1115
retainer -> Iter2#iter{oid_tab = Ret}
1118
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
1121
%% Iterates over a table and applies Fun(ListOfRecords)
1122
%% with a suitable amount of records, e.g. 1000 or so.
1123
%% ListOfRecords is [] when the iteration is over.
1125
%% OidKind affects which internal table to be iterated over and
1126
%% ValKind affects which table to pick the actual records from. Legal
1127
%% values for OidKind and ValKind is the atom table or the atom
1130
%% The iteration may either be performed over the main table (which
1131
%% contains the latest values of the records, i.e. the values that
1132
%% are visible to the applications) or over the checkpoint retainer
1133
%% (which contains the values as the looked like the timepoint when
1134
%% the checkpoint was activated).
1136
%% It is possible to iterate over the main table and pick values
1137
%% from the retainer and vice versa.
1139
iterate(Name, Tab, Fun, Acc, Source, Val) ->
1140
Iter0 = #iter{tab_name = Tab, source = Source, val = Val},
1141
case call(Name, {iter_begin, Iter0}) of
1145
link(Pid), % We don't want any pending fixtable's
1146
Res = (catch iter(Fun, Acc, Iter)),
1148
call(Name, {iter_end, Iter}),
1150
{'EXIT', Reason} -> {error, Reason};
1151
{error, Reason} -> {error, Reason};
1156
iter(Fun, Acc, Iter)->
1157
iter(Fun, Acc, Iter, retainer_first(Iter#iter.oid_tab)).
1159
iter(Fun, Acc, Iter, Key) ->
1160
case get_records(Iter, Key) of
1161
{'$end_of_table', []} ->
1163
{'$end_of_table', Records} ->
1164
Acc2 = Fun(Records, Acc),
1167
Acc2 = Fun(Records, Acc),
1168
iter(Fun, Acc2, Iter, Next)
1171
stop_iteration(Reason) ->
1172
throw({error, {stopped, Reason}}).
1174
get_records(Iter, Key) ->
1175
get_records(Iter, Key, 500, []). % 500 keys
1177
get_records(_Iter, Key, 0, Acc) ->
1178
{Key, lists:append(lists:reverse(Acc))};
1179
get_records(_Iter, '$end_of_table', _I, Acc) ->
1180
{'$end_of_table', lists:append(lists:reverse(Acc))};
1181
get_records(Iter, Key, I, Acc) ->
1182
Recs = get_val(Iter, Key),
1183
Next = retainer_next(Iter#iter.oid_tab, Key),
1184
get_records(Iter, Next, I-1, [Recs | Acc]).
1186
get_val(Iter, Key) when Iter#iter.val == latest ->
1187
get_latest_val(Iter, Key);
1188
get_val(Iter, Key) when Iter#iter.val == checkpoint ->
1189
get_checkpoint_val(Iter, Key).
1191
get_latest_val(Iter, Key) when Iter#iter.source == table ->
1192
retainer_get(Iter#iter.main_tab, Key);
1193
get_latest_val(Iter, Key) when Iter#iter.source == retainer ->
1194
DeleteOid = {Iter#iter.tab_name, Key},
1195
[DeleteOid | retainer_get(Iter#iter.main_tab, Key)].
1197
get_checkpoint_val(Iter, Key) when Iter#iter.source == table ->
1198
retainer_get(Iter#iter.main_tab, Key);
1199
get_checkpoint_val(Iter, Key) when Iter#iter.source == retainer ->
1200
DeleteOid = {Iter#iter.tab_name, Key},
1201
case retainer_get(Iter#iter.retainer_tab, Key) of
1202
[{_, _, []}] -> [DeleteOid];
1203
[{_, _, Records}] -> [DeleteOid | Records]
1206
%%%%%%%%%%%%%%%%%%%%%%%%%%%
1209
system_continue(_Parent, _Debug, Cp) ->
1212
system_terminate(_Reason, _Parent,_Debug, Cp) ->
1215
system_code_change(Cp, _Module, _OldVsn, _Extra) ->
1218
convert_cp_record(Cp) when record(Cp, checkpoint) ->
1220
case Cp#checkpoint.ram_overrides_dump of
1221
true -> Cp#checkpoint.min ++ Cp#checkpoint.max;
1225
{ok, #checkpoint_args{name = Cp#checkpoint.name,
1226
allow_remote = Cp#checkpoint.name,
1227
ram_overrides_dump = ROD,
1228
nodes = Cp#checkpoint.nodes,
1229
node = Cp#checkpoint.node,
1230
now = Cp#checkpoint.now,
1231
cookie = ?unique_cookie,
1232
min = Cp#checkpoint.min,
1233
max = Cp#checkpoint.max,
1234
pending_tab = Cp#checkpoint.pending_tab,
1235
wait_for_old = Cp#checkpoint.wait_for_old,
1236
is_activated = Cp#checkpoint.is_activated,
1237
ignore_new = Cp#checkpoint.ignore_new,
1238
retainers = Cp#checkpoint.retainers,
1239
iterators = Cp#checkpoint.iterators,
1240
supervisor = Cp#checkpoint.supervisor,
1241
pid = Cp#checkpoint.pid
1243
convert_cp_record(Cp) when record(Cp, checkpoint_args) ->
1244
AllTabs = Cp#checkpoint_args.min ++ Cp#checkpoint_args.max,
1245
ROD = case Cp#checkpoint_args.ram_overrides_dump of
1255
{error, {"Old node cannot handle new checkpoint protocol",
1256
ram_overrides_dump}};
1258
{ok, #checkpoint{name = Cp#checkpoint_args.name,
1259
allow_remote = Cp#checkpoint_args.name,
1260
ram_overrides_dump = ROD,
1261
nodes = Cp#checkpoint_args.nodes,
1262
node = Cp#checkpoint_args.node,
1263
now = Cp#checkpoint_args.now,
1264
min = Cp#checkpoint_args.min,
1265
max = Cp#checkpoint_args.max,
1266
pending_tab = Cp#checkpoint_args.pending_tab,
1267
wait_for_old = Cp#checkpoint_args.wait_for_old,
1268
is_activated = Cp#checkpoint_args.is_activated,
1269
ignore_new = Cp#checkpoint_args.ignore_new,
1270
retainers = Cp#checkpoint_args.retainers,
1271
iterators = Cp#checkpoint_args.iterators,
1272
supervisor = Cp#checkpoint_args.supervisor,
1273
pid = Cp#checkpoint_args.pid
1277
%%%%%%%%%%%%%%%%%%%%%%%%%%
1280
case ?catch_val(Var) of
1281
{'EXIT', _ReASoN_} -> mnesia_lib:other_val(Var, _ReASoN_);