~ubuntu-branches/ubuntu/trusty/erlang/trusty

« back to all changes in this revision

Viewing changes to lib/dialyzer/test/r9c_tests_SUITE_data/src/mnesia/mnesia_checkpoint.erl

  • Committer: Bazaar Package Importer
  • Author(s): Clint Byrum
  • Date: 2011-05-05 15:48:43 UTC
  • mfrom: (3.5.13 sid)
  • Revision ID: james.westby@ubuntu.com-20110505154843-0om6ekzg6m7ugj27
Tags: 1:14.b.2-dfsg-3ubuntu1
* Merge from debian unstable.  Remaining changes:
  - Drop libwxgtk2.8-dev build dependency. Wx isn't in main, and not
    supposed to.
  - Drop erlang-wx binary.
  - Drop erlang-wx dependency from -megaco, -common-test, and -reltool, they
    do not really need wx. Also drop it from -debugger; the GUI needs wx,
    but it apparently has CLI bits as well, and is also needed by -megaco,
    so let's keep the package for now.
  - debian/patches/series: Do what I meant, and enable build-options.patch
    instead.
* Additional changes:
  - Drop erlang-wx from -et
* Dropped Changes:
  - patches/pcre-crash.patch: CVE-2008-2371: outer level option with
    alternatives caused crash. (Applied Upstream)
  - fix for ssl certificate verification in newSSL: 
    ssl_cacertfile_fix.patch (Applied Upstream)
  - debian/patches/series: Enable native.patch again, to get stripped beam
    files and reduce the package size again. (build-options is what
    actually accomplished this)
  - Remove build-options.patch on advice from upstream and because it caused
    odd build failures.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
%% ``The contents of this file are subject to the Erlang Public License,
 
2
%% Version 1.1, (the "License"); you may not use this file except in
 
3
%% compliance with the License. You should have received a copy of the
 
4
%% Erlang Public License along with this software. If not, it can be
 
5
%% retrieved via the world wide web at http://www.erlang.org/.
 
6
%% 
 
7
%% Software distributed under the License is distributed on an "AS IS"
 
8
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
 
9
%% the License for the specific language governing rights and limitations
 
10
%% under the License.
 
11
%% 
 
12
%% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
 
13
%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
 
14
%% AB. All Rights Reserved.''
 
15
%% 
 
16
%%     $Id: mnesia_checkpoint.erl,v 1.1 2008/12/17 09:53:38 mikpe Exp $
 
17
%%
 
18
-module(mnesia_checkpoint).
 
19
 
 
20
%% TM callback interface
 
21
-export([
 
22
         tm_add_copy/2,
 
23
         tm_change_table_copy_type/3,
 
24
         tm_del_copy/2,
 
25
         tm_mnesia_down/1,
 
26
         tm_prepare/1,
 
27
         tm_retain/4,
 
28
         tm_retain/5,
 
29
         tm_enter_pending/1,
 
30
         tm_enter_pending/3,
 
31
         tm_exit_pending/1,
 
32
         convert_cp_record/1
 
33
        ]).
 
34
 
 
35
%% Public interface
 
36
-export([
 
37
         activate/1,
 
38
         checkpoints/0,
 
39
         deactivate/1,
 
40
         deactivate/2,
 
41
         iterate/6,
 
42
         most_local_node/2,
 
43
         really_retain/2,
 
44
         stop/0,
 
45
         stop_iteration/1,
 
46
         tables_and_cookie/1
 
47
        ]).
 
48
 
 
49
%% Internal
 
50
-export([
 
51
         call/2,
 
52
         cast/2,
 
53
         init/1,
 
54
         remote_deactivate/1,
 
55
         start/1
 
56
        ]).
 
57
 
 
58
%% sys callback interface
 
59
-export([
 
60
         system_code_change/4,
 
61
         system_continue/3,
 
62
         system_terminate/4
 
63
        ]).
 
64
 
 
65
-include("mnesia.hrl").
 
66
-import(mnesia_lib, [add/2, del/2, set/2, unset/1]).
 
67
-import(mnesia_lib, [dbg_out/2]).
 
68
 
 
69
-record(tm, {log, pending, transactions, checkpoints}).
 
70
 
 
71
-record(checkpoint_args, {name = {now(), node()},
 
72
                          allow_remote = true,
 
73
                          ram_overrides_dump = false,
 
74
                          nodes = [],
 
75
                          node = node(),
 
76
                          now = now(),
 
77
                          cookie = ?unique_cookie,
 
78
                          min = [],
 
79
                          max = [],
 
80
                          pending_tab,
 
81
                          wait_for_old, % Initially undefined then List
 
82
                          is_activated = false,
 
83
                          ignore_new = [],
 
84
                          retainers = [],
 
85
                          iterators = [],
 
86
                          supervisor,
 
87
                          pid
 
88
                         }).
 
89
 
 
90
%% Old record definition
 
91
-record(checkpoint, {name,
 
92
                     allow_remote,
 
93
                     ram_overrides_dump,
 
94
                     nodes,
 
95
                     node,
 
96
                     now,
 
97
                     min,
 
98
                     max,
 
99
                     pending_tab,
 
100
                     wait_for_old,
 
101
                     is_activated,
 
102
                     ignore_new,
 
103
                     retainers,
 
104
                     iterators,
 
105
                     supervisor,
 
106
                     pid
 
107
                    }).
 
108
 
 
109
-record(retainer, {cp_name, tab_name, store, writers = [], really_retain = true}).
 
110
 
 
111
-record(iter, {tab_name, oid_tab, main_tab, retainer_tab, source, val, pid}).
 
112
 
 
113
-record(pending, {tid, disc_nodes = [], ram_nodes = []}).
 
114
 
 
115
 
 
116
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
 
117
%% TM callback functions
 
118
 
 
119
stop() ->
 
120
    lists:foreach(fun(Name) -> call(Name, stop) end,
 
121
                  checkpoints()),
 
122
    ok.
 
123
 
 
124
tm_prepare(Cp) when record(Cp, checkpoint_args) ->
 
125
    Name = Cp#checkpoint_args.name,
 
126
    case lists:member(Name, checkpoints()) of
 
127
        false ->
 
128
            start_retainer(Cp);
 
129
        true ->
 
130
            {error, {already_exists, Name, node()}}
 
131
    end;
 
132
tm_prepare(Cp) when record(Cp, checkpoint) ->
 
133
    %% Node with old protocol sent an old checkpoint record
 
134
    %% and we have to convert it
 
135
    case convert_cp_record(Cp) of
 
136
        {ok, NewCp} ->
 
137
            tm_prepare(NewCp);
 
138
        {error, Reason} ->
 
139
            {error, Reason}
 
140
    end.
 
141
 
 
142
tm_mnesia_down(Node) ->
 
143
    lists:foreach(fun(Name) -> cast(Name, {mnesia_down, Node}) end,
 
144
                  checkpoints()).
 
145
 
 
146
%% Returns pending
 
147
tm_enter_pending(Tid, DiscNs, RamNs) ->
 
148
    Pending = #pending{tid = Tid, disc_nodes = DiscNs, ram_nodes = RamNs},
 
149
    tm_enter_pending(Pending).
 
150
 
 
151
tm_enter_pending(Pending) ->
 
152
    PendingTabs = val(pending_checkpoints),
 
153
    tm_enter_pending(PendingTabs, Pending).
 
154
 
 
155
tm_enter_pending([], Pending) ->
 
156
    Pending;
 
157
tm_enter_pending([Tab | Tabs], Pending) ->
 
158
    catch ?ets_insert(Tab, Pending),
 
159
    tm_enter_pending(Tabs, Pending).
 
160
 
 
161
tm_exit_pending(Tid) ->
 
162
    Pids = val(pending_checkpoint_pids),
 
163
    tm_exit_pending(Pids, Tid).
 
164
    
 
165
tm_exit_pending([], Tid) ->
 
166
    Tid;
 
167
tm_exit_pending([Pid | Pids], Tid) ->
 
168
    Pid ! {self(), {exit_pending, Tid}},
 
169
    tm_exit_pending(Pids, Tid).
 
170
 
 
171
enter_still_pending([Tid | Tids], Tab) ->
 
172
    ?ets_insert(Tab, #pending{tid = Tid}),
 
173
    enter_still_pending(Tids, Tab);
 
174
enter_still_pending([], _Tab) ->
 
175
    ok.
 
176
 
 
177
 
 
178
%% Looks up checkpoints for functions in mnesia_tm.
 
179
tm_retain(Tid, Tab, Key, Op) ->
 
180
    case val({Tab, commit_work}) of
 
181
        [{checkpoints, Checkpoints} | _ ] ->
 
182
            tm_retain(Tid, Tab, Key, Op, Checkpoints);
 
183
        _ -> 
 
184
            undefined
 
185
    end.
 
186
    
 
187
tm_retain(Tid, Tab, Key, Op, Checkpoints) ->
 
188
    case Op of
 
189
        clear_table ->
 
190
            OldRecs = mnesia_lib:db_match_object(Tab, '_'),
 
191
            send_group_retain(OldRecs, Checkpoints, Tid, Tab, []),
 
192
            OldRecs;
 
193
        _ ->
 
194
            OldRecs = mnesia_lib:db_get(Tab, Key),
 
195
            send_retain(Checkpoints, {retain, Tid, Tab, Key, OldRecs}),
 
196
            OldRecs
 
197
    end.
 
198
 
 
199
send_group_retain([Rec | Recs], Checkpoints, Tid, Tab, [PrevRec | PrevRecs])
 
200
  when element(2, Rec) /= element(2, PrevRec) ->
 
201
    Key = element(2, PrevRec),
 
202
    OldRecs = lists:reverse([PrevRec | PrevRecs]),
 
203
    send_retain(Checkpoints, {retain, Tid, Tab, Key, OldRecs}),
 
204
    send_group_retain(Recs, Checkpoints, Tid, Tab, [Rec]);
 
205
send_group_retain([Rec | Recs], Checkpoints, Tid, Tab, Acc) ->
 
206
    send_group_retain(Recs, Checkpoints, Tid, Tab, [Rec | Acc]);
 
207
send_group_retain([], Checkpoints, Tid, Tab, [PrevRec | PrevRecs]) ->
 
208
    Key = element(2, PrevRec),
 
209
    OldRecs = lists:reverse([PrevRec | PrevRecs]),
 
210
    send_retain(Checkpoints, {retain, Tid, Tab, Key, OldRecs}),
 
211
    ok;
 
212
send_group_retain([], _Checkpoints, _Tid, _Tab, []) ->
 
213
    ok.
 
214
 
 
215
send_retain([Name | Names], Msg) ->
 
216
    cast(Name, Msg),
 
217
    send_retain(Names, Msg);
 
218
send_retain([], _Msg) ->
 
219
    ok.
 
220
    
 
221
tm_add_copy(Tab, Node) when Node /= node() ->
 
222
    case val({Tab, commit_work}) of
 
223
        [{checkpoints, Checkpoints} | _ ] ->
 
224
            Fun = fun(Name) -> call(Name, {add_copy, Tab, Node}) end,
 
225
            map_call(Fun, Checkpoints, ok);
 
226
        _  -> 
 
227
            ok
 
228
    end.
 
229
 
 
230
tm_del_copy(Tab, Node) when Node == node() ->
 
231
    mnesia_subscr:unsubscribe_table(Tab),
 
232
    case val({Tab, commit_work}) of
 
233
        [{checkpoints, Checkpoints} | _ ] ->        
 
234
            Fun = fun(Name) -> call(Name, {del_copy, Tab, Node}) end,
 
235
            map_call(Fun, Checkpoints, ok);
 
236
        _ ->
 
237
            ok
 
238
    end.
 
239
 
 
240
tm_change_table_copy_type(Tab, From, To) ->
 
241
    case val({Tab, commit_work}) of
 
242
        [{checkpoints, Checkpoints} | _ ] ->
 
243
            Fun = fun(Name) -> call(Name, {change_copy, Tab, From, To}) end,
 
244
            map_call(Fun, Checkpoints, ok);
 
245
        _ -> 
 
246
            ok
 
247
    end.
 
248
 
 
249
map_call(Fun, [Name | Names], Res) ->
 
250
    case Fun(Name) of
 
251
         ok ->
 
252
            map_call(Fun, Names, Res);
 
253
        {error, {no_exists, Name}} ->
 
254
            map_call(Fun, Names, Res);
 
255
        {error, Reason} ->
 
256
            %% BUGBUG: We may end up with some checkpoint retainers
 
257
            %% too much in the add_copy case. How do we remove them?
 
258
            map_call(Fun, Names, {error, Reason})
 
259
    end;
 
260
map_call(_Fun, [], Res) ->
 
261
    Res.
 
262
 
 
263
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
 
264
%% Public functions
 
265
 
 
266
deactivate(Name) ->
 
267
    case call(Name, get_checkpoint) of
 
268
        {error, Reason} ->
 
269
            {error, Reason};
 
270
        Cp ->
 
271
            deactivate(Cp#checkpoint_args.nodes, Name)
 
272
    end.
 
273
 
 
274
deactivate(Nodes, Name) ->
 
275
    rpc:multicall(Nodes, ?MODULE, remote_deactivate, [Name]),
 
276
    ok.
 
277
 
 
278
remote_deactivate(Name) ->
 
279
    call(Name, deactivate).
 
280
 
 
281
checkpoints() -> val(checkpoints).
 
282
 
 
283
tables_and_cookie(Name) ->
 
284
    case call(Name, get_checkpoint) of
 
285
        {error, Reason} ->
 
286
            {error, Reason};
 
287
        Cp ->
 
288
            Tabs = Cp#checkpoint_args.min ++ Cp#checkpoint_args.max,
 
289
            Cookie = Cp#checkpoint_args.cookie,
 
290
            {ok, Tabs, Cookie}
 
291
    end.
 
292
 
 
293
most_local_node(Name, Tab) ->
 
294
    case ?catch_val({Tab, {retainer, Name}}) of
 
295
        {'EXIT', _} -> 
 
296
            {error, {"No retainer attached to table", [Tab, Name]}};
 
297
        R ->        
 
298
            Writers = R#retainer.writers,
 
299
            LocalWriter = lists:member(node(), Writers),
 
300
            if
 
301
                LocalWriter == true ->
 
302
                    {ok, node()};
 
303
                Writers /= [] ->
 
304
                    {ok, hd(Writers)};
 
305
                true  ->
 
306
                    {error, {"No retainer attached to table", [Tab, Name]}}
 
307
            end
 
308
    end.
 
309
 
 
310
really_retain(Name, Tab) ->
 
311
    R = val({Tab, {retainer, Name}}),
 
312
    R#retainer.really_retain.
 
313
 
 
314
%% Activate a checkpoint.
 
315
%%
 
316
%% A checkpoint is a transaction consistent state that may be used to
 
317
%% perform a distributed backup or to rollback the involved tables to
 
318
%% their old state. Backups may also be used to restore tables to
 
319
%% their old state. Args is a list of the following tuples:
 
320
%%
 
321
%% {name, Name}
 
322
%%    Name of checkpoint. Each checkpoint must have a name which
 
323
%%    is unique on the reachable nodes. The name may be reused when
 
324
%%    the checkpoint has been deactivated.
 
325
%%    By default a probably unique name is generated.
 
326
%%    Multiple checkpoints may be set on the same table.
 
327
%%
 
328
%% {allow_remote, Bool}
 
329
%%   false means that all retainers must be local. If the
 
330
%%   table does not reside locally, the checkpoint fails.
 
331
%%   true allows retainers on other nodes.
 
332
%%
 
333
%% {min, MinTabs}
 
334
%%   Minimize redundancy and only keep checkpoint info together with
 
335
%%   one replica, preferrably at the local node. If any node involved
 
336
%%   the checkpoint goes down, the checkpoint is deactivated.
 
337
%%
 
338
%% {max, MaxTabs}
 
339
%%    Maximize redundancy and keep checkpoint info together with all
 
340
%%    replicas. The checkpoint becomes more fault tolerant if the
 
341
%%    tables has several replicas. When new replicas are added, they
 
342
%%    will also get a retainer attached to them.
 
343
%%
 
344
%% {ram_overrides_dump, Bool}
 
345
%% {ram_overrides_dump, Tabs}
 
346
%%   Only applicable for ram_copies. Bool controls which versions of
 
347
%%   the records that should be included in the checkpoint state.
 
348
%%   true means that the latest comitted records in ram (i.e. the
 
349
%%   records that the application accesses) should be included
 
350
%%   in the checkpoint. false means that the records dumped to
 
351
%%   dat-files (the records that will be loaded at startup) should
 
352
%%   be included in the checkpoint. Tabs is a list of tables.
 
353
%%   Default is false.
 
354
%%
 
355
%% {ignore_new, TidList}
 
356
%%   Normally we wait for all pending transactions to complete
 
357
%%   before we allow iteration over the checkpoint. But in order
 
358
%%   to cope with checkpoint activation inside a transaction that
 
359
%%   currently prepares commit (mnesia_init:get_net_work_copy) we
 
360
%%   need to have the ability to ignore the enclosing transaction.
 
361
%%   We do not wait for the transactions in TidList to end. The
 
362
%%   transactions in TidList are regarded as newer than the checkpoint.
 
363
 
 
364
activate(Args) ->
 
365
    case args2cp(Args) of
 
366
        {ok, Cp} ->
 
367
            do_activate(Cp);
 
368
        {error, Reason} ->
 
369
            {error, Reason}
 
370
    end.
 
371
 
 
372
args2cp(Args) when list(Args)->
 
373
    case catch lists:foldl(fun check_arg/2, #checkpoint_args{}, Args) of
 
374
        {'EXIT', Reason} ->
 
375
            {error, Reason};
 
376
        Cp ->
 
377
            case check_tables(Cp) of
 
378
                {error, Reason} ->
 
379
                    {error, Reason};
 
380
                {ok, Overriders, AllTabs} ->
 
381
                    arrange_retainers(Cp, Overriders, AllTabs)
 
382
            end
 
383
    end;
 
384
args2cp(Args) ->
 
385
    {error, {badarg, Args}}.
 
386
 
 
387
check_arg({name, Name}, Cp) ->
 
388
    case lists:member(Name, checkpoints()) of
 
389
        true ->
 
390
            exit({already_exists, Name});
 
391
        false ->
 
392
            case catch tab2retainer({foo, Name}) of
 
393
                List when list(List) ->
 
394
                    Cp#checkpoint_args{name = Name};
 
395
                _ ->
 
396
                    exit({badarg, Name})
 
397
            end
 
398
    end;
 
399
check_arg({allow_remote, true}, Cp) ->
 
400
    Cp#checkpoint_args{allow_remote = true};
 
401
check_arg({allow_remote, false}, Cp) ->
 
402
    Cp#checkpoint_args{allow_remote = false};
 
403
check_arg({ram_overrides_dump, true}, Cp) ->
 
404
    Cp#checkpoint_args{ram_overrides_dump = true};
 
405
check_arg({ram_overrides_dump, false}, Cp) ->
 
406
    Cp#checkpoint_args{ram_overrides_dump = false};
 
407
check_arg({ram_overrides_dump, Tabs}, Cp) when list(Tabs) ->
 
408
    Cp#checkpoint_args{ram_overrides_dump = Tabs};
 
409
check_arg({min, Tabs}, Cp) when list(Tabs) ->
 
410
    Cp#checkpoint_args{min = Tabs};
 
411
check_arg({max, Tabs}, Cp) when list(Tabs) ->
 
412
    Cp#checkpoint_args{max = Tabs};
 
413
check_arg({ignore_new, Tids}, Cp) when list(Tids) ->
 
414
    Cp#checkpoint_args{ignore_new = Tids};
 
415
check_arg(Arg, _) ->
 
416
    exit({badarg, Arg}).
 
417
 
 
418
check_tables(Cp) ->
 
419
    Min = Cp#checkpoint_args.min,
 
420
    Max = Cp#checkpoint_args.max,
 
421
    AllTabs = Min ++ Max,
 
422
    DoubleTabs = [T || T <- Min, lists:member(T, Max)],
 
423
    Overriders = Cp#checkpoint_args.ram_overrides_dump,
 
424
    if
 
425
        DoubleTabs /= [] ->
 
426
            {error, {combine_error, Cp#checkpoint_args.name,
 
427
                     [{min, DoubleTabs}, {max, DoubleTabs}]}};
 
428
        Min == [], Max == [] ->
 
429
            {error, {combine_error, Cp#checkpoint_args.name,
 
430
                     [{min, Min}, {max, Max}]}};
 
431
        Overriders == false ->
 
432
            {ok, [], AllTabs};
 
433
        Overriders == true ->
 
434
            {ok, AllTabs, AllTabs};
 
435
        list(Overriders) ->
 
436
            case [T || T <- Overriders, not lists:member(T, Min)] of
 
437
                [] ->
 
438
                    case [T || T <- Overriders, not lists:member(T, Max)] of
 
439
                        [] ->
 
440
                            {ok, Overriders, AllTabs};
 
441
                        Outsiders ->
 
442
                            {error, {combine_error, Cp#checkpoint_args.name,
 
443
                                     [{ram_overrides_dump, Outsiders},
 
444
                                      {max, Outsiders}]}}
 
445
                    end;
 
446
                Outsiders ->
 
447
                    {error, {combine_error, Cp#checkpoint_args.name,
 
448
                             [{ram_overrides_dump, Outsiders},
 
449
                              {min, Outsiders}]}}
 
450
            end
 
451
    end.
 
452
 
 
453
arrange_retainers(Cp, Overriders, AllTabs) ->
 
454
    R = #retainer{cp_name = Cp#checkpoint_args.name},
 
455
    case catch [R#retainer{tab_name = Tab, 
 
456
                           writers = select_writers(Cp, Tab)}
 
457
                || Tab <- AllTabs] of
 
458
        {'EXIT', Reason} ->
 
459
            {error, Reason};
 
460
        Retainers ->
 
461
            {ok, Cp#checkpoint_args{ram_overrides_dump = Overriders,
 
462
                               retainers = Retainers,
 
463
                               nodes = writers(Retainers)}}
 
464
    end.
 
465
 
 
466
select_writers(Cp, Tab) ->
 
467
    case filter_remote(Cp, val({Tab, active_replicas})) of
 
468
        [] ->
 
469
            exit({"Cannot prepare checkpoint (replica not available)",
 
470
                 [Tab, Cp#checkpoint_args.name]});
 
471
        Writers ->
 
472
            This = node(),
 
473
            case {lists:member(Tab, Cp#checkpoint_args.max),
 
474
                  lists:member(This, Writers)} of
 
475
                {true, _} -> Writers; % Max
 
476
                {false, true} -> [This];
 
477
                {false, false} -> [hd(Writers)]
 
478
            end
 
479
    end.
 
480
 
 
481
filter_remote(Cp, Writers) when Cp#checkpoint_args.allow_remote == true ->
 
482
    Writers;
 
483
filter_remote(_Cp, Writers) ->
 
484
    This = node(),
 
485
    case lists:member(This, Writers) of
 
486
        true -> [This];
 
487
        false  -> []
 
488
    end.
 
489
        
 
490
writers(Retainers) ->
 
491
    Fun = fun(R, Acc) -> R#retainer.writers ++ Acc end,
 
492
    Writers = lists:foldl(Fun, [], Retainers),
 
493
    mnesia_lib:uniq(Writers).
 
494
 
 
495
do_activate(Cp) ->
 
496
    Name = Cp#checkpoint_args.name,
 
497
    Nodes = Cp#checkpoint_args.nodes,
 
498
    case mnesia_tm:prepare_checkpoint(Nodes, Cp) of
 
499
        {Replies, []} ->
 
500
            check_prep(Replies, Name, Nodes, Cp#checkpoint_args.ignore_new);
 
501
        {_, BadNodes} ->
 
502
            {error, {"Cannot prepare checkpoint (bad nodes)",
 
503
                     [Name, BadNodes]}}
 
504
    end.
 
505
        
 
506
check_prep([{ok, Name, IgnoreNew, _Node} | Replies], Name, Nodes, IgnoreNew) ->
 
507
    check_prep(Replies, Name, Nodes, IgnoreNew);
 
508
check_prep([{error, Reason} | _Replies], Name, _Nodes, _IgnoreNew) ->
 
509
    {error, {"Cannot prepare checkpoint (bad reply)",
 
510
             [Name, Reason]}};
 
511
check_prep([{badrpc, Reason} | _Replies], Name, _Nodes, _IgnoreNew) ->
 
512
    {error, {"Cannot prepare checkpoint (badrpc)",
 
513
             [Name, Reason]}};
 
514
check_prep([], Name, Nodes, IgnoreNew) ->
 
515
    collect_pending(Name, Nodes, IgnoreNew).
 
516
 
 
517
collect_pending(Name, Nodes, IgnoreNew) ->
 
518
    case rpc:multicall(Nodes, ?MODULE, call, [Name, collect_pending]) of
 
519
        {Replies, []} ->
 
520
            case catch ?ets_new_table(mnesia_union, [bag]) of
 
521
                {'EXIT', Reason} -> %% system limit
 
522
                    Msg = "Cannot create an ets table pending union",
 
523
                    {error, {system_limit, Msg, Reason}};
 
524
                UnionTab ->
 
525
                    compute_union(Replies, Nodes, Name, UnionTab, IgnoreNew)
 
526
            end;
 
527
        {_, BadNodes} ->
 
528
            deactivate(Nodes, Name),
 
529
            {error, {"Cannot collect from pending checkpoint", Name, BadNodes}}
 
530
    end.
 
531
 
 
532
compute_union([{ok, Pending} | Replies], Nodes, Name, UnionTab, IgnoreNew) ->
 
533
    add_pending(Pending, UnionTab),
 
534
    compute_union(Replies, Nodes, Name, UnionTab, IgnoreNew);
 
535
compute_union([{error, Reason} | _Replies], Nodes, Name, UnionTab, _IgnoreNew) ->
 
536
    deactivate(Nodes, Name),
 
537
    ?ets_delete_table(UnionTab),
 
538
    {error, Reason};
 
539
compute_union([{badrpc, Reason} | _Replies], Nodes, Name, UnionTab, _IgnoreNew) ->
 
540
    deactivate(Nodes, Name),
 
541
    ?ets_delete_table(UnionTab),
 
542
    {error, {badrpc, Reason}};
 
543
compute_union([], Nodes, Name, UnionTab, IgnoreNew) ->
 
544
    send_activate(Nodes, Nodes, Name, UnionTab, IgnoreNew).
 
545
 
 
546
add_pending([P | Pending], UnionTab) ->
 
547
    add_pending_node(P#pending.disc_nodes, P#pending.tid, UnionTab),
 
548
    add_pending_node(P#pending.ram_nodes, P#pending.tid, UnionTab),
 
549
    add_pending(Pending, UnionTab);
 
550
add_pending([], _UnionTab) ->
 
551
    ok.
 
552
 
 
553
add_pending_node([Node | Nodes], Tid, UnionTab) ->
 
554
    ?ets_insert(UnionTab, {Node, Tid}),
 
555
    add_pending_node(Nodes, Tid, UnionTab);
 
556
add_pending_node([], _Tid, _UnionTab) ->
 
557
    ok.
 
558
 
 
559
send_activate([Node | Nodes], AllNodes, Name, UnionTab, IgnoreNew) ->
 
560
    Pending = [Tid || {_, Tid} <- ?ets_lookup(UnionTab, Node), 
 
561
                      not lists:member(Tid, IgnoreNew)],
 
562
    case rpc:call(Node, ?MODULE, call, [Name, {activate, Pending}]) of
 
563
        activated ->
 
564
            send_activate(Nodes, AllNodes, Name, UnionTab, IgnoreNew);
 
565
        {badrpc, Reason} ->
 
566
            deactivate(Nodes, Name),
 
567
            ?ets_delete_table(UnionTab),
 
568
            {error, {"Activation failed (bad node)", Name, Node, Reason}};
 
569
        {error, Reason} ->
 
570
            deactivate(Nodes, Name),
 
571
            ?ets_delete_table(UnionTab),
 
572
            {error, {"Activation failed", Name, Node, Reason}}
 
573
    end;
 
574
send_activate([], AllNodes, Name, UnionTab, _IgnoreNew) ->
 
575
    ?ets_delete_table(UnionTab),
 
576
    {ok, Name, AllNodes}.
 
577
 
 
578
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
 
579
%% Checkpoint server
 
580
 
 
581
cast(Name, Msg) ->
 
582
    case ?catch_val({checkpoint, Name}) of
 
583
        {'EXIT', _} ->
 
584
            {error, {no_exists, Name}};
 
585
        
 
586
        Pid when pid(Pid) ->
 
587
            Pid ! {self(), Msg},
 
588
            {ok, Pid}
 
589
    end.
 
590
 
 
591
call(Name, Msg) ->
 
592
    case cast(Name, Msg) of
 
593
        {ok, Pid} ->
 
594
            catch link(Pid), % Always local
 
595
            Self = self(),
 
596
            receive
 
597
                {'EXIT', Pid, Reason} ->
 
598
                    {error, {"Got exit", [Name, Reason]}};
 
599
                {Name, Self, Reply} ->
 
600
                    unlink(Pid),
 
601
                    Reply
 
602
            end;
 
603
        Error ->
 
604
            Error
 
605
    end.
 
606
 
 
607
abcast(Nodes, Name, Msg) ->
 
608
    rpc:eval_everywhere(Nodes, ?MODULE, cast, [Name, Msg]).
 
609
 
 
610
reply(nopid, _Name, _Reply) ->
 
611
    ignore;
 
612
reply(ReplyTo, Name, Reply) ->
 
613
    ReplyTo ! {Name, ReplyTo, Reply}.
 
614
 
 
615
%% Returns {ok, NewCp} or {error, Reason}
 
616
start_retainer(Cp) ->
 
617
    % Will never be restarted
 
618
    Name = Cp#checkpoint_args.name,
 
619
    case supervisor:start_child(mnesia_checkpoint_sup, [Cp]) of
 
620
        {ok, _Pid} ->
 
621
            {ok, Name, Cp#checkpoint_args.ignore_new, node()};
 
622
        {error, Reason} ->
 
623
            {error, {"Cannot create checkpoint retainer",
 
624
                     Name, node(), Reason}}
 
625
    end.
 
626
 
 
627
start(Cp) ->
 
628
    Name = Cp#checkpoint_args.name,
 
629
    Args = [Cp#checkpoint_args{supervisor = self()}],
 
630
    mnesia_monitor:start_proc({?MODULE, Name}, ?MODULE, init, Args).
 
631
 
 
632
init(Cp) ->
 
633
    process_flag(trap_exit, true),
 
634
    Name = Cp#checkpoint_args.name,
 
635
    Props = [set, public, {keypos, 2}],
 
636
    case catch ?ets_new_table(mnesia_pending_checkpoint, Props) of
 
637
        {'EXIT', Reason} -> %% system limit
 
638
            Msg = "Cannot create an ets table for pending transactions",
 
639
            Error = {error, {system_limit, Name, Msg, Reason}},
 
640
            proc_lib:init_ack(Cp#checkpoint_args.supervisor, Error);
 
641
        PendingTab ->
 
642
            Rs = [prepare_tab(Cp, R) || R <- Cp#checkpoint_args.retainers],
 
643
            Cp2 = Cp#checkpoint_args{retainers = Rs,
 
644
                                pid = self(),
 
645
                                pending_tab = PendingTab},
 
646
            add(pending_checkpoint_pids, self()),
 
647
            add(pending_checkpoints, PendingTab),
 
648
            set({checkpoint, Name}, self()),
 
649
            add(checkpoints, Name),
 
650
            dbg_out("Checkpoint ~p (~p) started~n", [Name, self()]),
 
651
            proc_lib:init_ack(Cp2#checkpoint_args.supervisor, {ok, self()}),
 
652
            retainer_loop(Cp2)
 
653
    end.
 
654
    
 
655
prepare_tab(Cp, R) ->
 
656
    Tab = R#retainer.tab_name,
 
657
    prepare_tab(Cp, R, val({Tab, storage_type})).
 
658
 
 
659
prepare_tab(Cp, R, Storage) ->
 
660
    Tab = R#retainer.tab_name,
 
661
    Name = R#retainer.cp_name,
 
662
    case lists:member(node(), R#retainer.writers) of
 
663
        true ->
 
664
            R2 = retainer_create(Cp, R, Tab, Name, Storage),
 
665
            set({Tab, {retainer, Name}}, R2),
 
666
            add({Tab, checkpoints}, Name), %% Keep checkpoint info for table_info & mnesia_session 
 
667
            add_chkp_info(Tab, Name),
 
668
            R2;
 
669
        false ->
 
670
            set({Tab, {retainer, Name}}, R#retainer{store = undefined}),
 
671
            R
 
672
    end.
 
673
 
 
674
add_chkp_info(Tab, Name) ->
 
675
    case val({Tab, commit_work}) of
 
676
        [{checkpoints, OldList} | CommitList] ->
 
677
            case lists:member(Name, OldList) of 
 
678
                true -> 
 
679
                    ok;
 
680
                false -> 
 
681
                    NewC = [{checkpoints, [Name | OldList]} | CommitList],
 
682
                    mnesia_lib:set({Tab, commit_work}, NewC)
 
683
            end;
 
684
        CommitList ->
 
685
            Chkp = {checkpoints, [Name]},
 
686
            %% OBS checkpoints needs to be first in the list!
 
687
            mnesia_lib:set({Tab, commit_work}, [Chkp | CommitList])
 
688
    end.
 
689
 
 
690
tab2retainer({Tab, Name}) ->
 
691
    FlatName = lists:flatten(io_lib:write(Name)),
 
692
    mnesia_lib:dir(lists:concat([?MODULE, "_", Tab, "_", FlatName, ".RET"])).
 
693
 
 
694
retainer_create(_Cp, R, Tab, Name, disc_only_copies) ->
 
695
    Fname = tab2retainer({Tab, Name}),
 
696
    file:delete(Fname),
 
697
    Args = [{file, Fname}, {type, set}, {keypos, 2}, {repair, false}],
 
698
    {ok, _} = mnesia_lib:dets_sync_open({Tab, Name}, Args),
 
699
    dbg_out("Checkpoint retainer created ~p ~p~n", [Name, Tab]),
 
700
    R#retainer{store = {dets, {Tab, Name}}, really_retain = true};
 
701
retainer_create(Cp, R, Tab, Name, Storage) ->
 
702
    T = ?ets_new_table(mnesia_retainer, [set, public, {keypos, 2}]),
 
703
    Overriders = Cp#checkpoint_args.ram_overrides_dump,
 
704
    ReallyR = R#retainer.really_retain,
 
705
    ReallyCp = lists:member(Tab, Overriders),
 
706
    ReallyR2 = prepare_ram_tab(Tab, T, Storage, ReallyR, ReallyCp),
 
707
    dbg_out("Checkpoint retainer created ~p ~p~n", [Name, Tab]),
 
708
    R#retainer{store = {ets, T}, really_retain = ReallyR2}.
 
709
 
 
710
%% Copy the dumped table into retainer if needed
 
711
%% If the really_retain flag already has been set to false,
 
712
%% it should remain false even if we change storage type
 
713
%% while the checkpoint is activated.
 
714
prepare_ram_tab(Tab, T, ram_copies, true, false) ->
 
715
    Fname = mnesia_lib:tab2dcd(Tab),
 
716
    case mnesia_lib:exists(Fname) of
 
717
        true -> 
 
718
            Log = mnesia_log:open_log(prepare_ram_tab, 
 
719
                                      mnesia_log:dcd_log_header(), 
 
720
                                      Fname, true, 
 
721
                                      mnesia_monitor:get_env(auto_repair), 
 
722
                                      read_only),
 
723
            Add = fun(Rec) ->
 
724
                          Key = element(2, Rec),
 
725
                          Recs =
 
726
                              case ?ets_lookup(T, Key) of
 
727
                                  [] -> [];
 
728
                                  [{_, _, Old}] -> Old
 
729
                              end,
 
730
                          ?ets_insert(T, {Tab, Key, [Rec | Recs]}),
 
731
                          continue
 
732
                  end,
 
733
            traverse_dcd(mnesia_log:chunk_log(Log, start), Log, Add),
 
734
            mnesia_log:close_log(Log);
 
735
        false ->
 
736
            ok
 
737
    end,
 
738
    false;
 
739
prepare_ram_tab(_, _, _, ReallyRetain, _) ->
 
740
    ReallyRetain.
 
741
 
 
742
traverse_dcd({Cont, [LogH | Rest]}, Log, Fun) 
 
743
  when record(LogH, log_header),  
 
744
       LogH#log_header.log_kind == dcd_log, 
 
745
       LogH#log_header.log_version >= "1.0" ->    
 
746
    traverse_dcd({Cont, Rest}, Log, Fun);   %% BUGBUG Error handling repaired files
 
747
traverse_dcd({Cont, Recs}, Log, Fun) ->     %% trashed data?? 
 
748
    lists:foreach(Fun, Recs), 
 
749
    traverse_dcd(mnesia_log:chunk_log(Log, Cont), Log, Fun);
 
750
traverse_dcd(eof, _Log, _Fun) ->
 
751
    ok.
 
752
 
 
753
retainer_get({ets, Store}, Key) -> ?ets_lookup(Store, Key);
 
754
retainer_get({dets, Store}, Key) -> dets:lookup(Store, Key).
 
755
 
 
756
retainer_put({ets, Store}, Val) -> ?ets_insert(Store, Val);
 
757
retainer_put({dets, Store}, Val) -> dets:insert(Store, Val).
 
758
 
 
759
retainer_first({ets, Store}) -> ?ets_first(Store);
 
760
retainer_first({dets, Store}) -> dets:first(Store).
 
761
 
 
762
retainer_next({ets, Store}, Key) -> ?ets_next(Store, Key);
 
763
retainer_next({dets, Store}, Key) -> dets:next(Store, Key).
 
764
 
 
765
%% retainer_next_slot(Tab, Pos) ->
 
766
%%     case retainer_slot(Tab, Pos) of
 
767
%%         '$end_of_table' ->
 
768
%%             '$end_of_table';
 
769
%%         [] ->
 
770
%%             retainer_next_slot(Tab, Pos + 1);
 
771
%%         Recs when list(Recs) ->
 
772
%%             {Pos, Recs}
 
773
%%     end.
 
774
%% 
 
775
%% retainer_slot({ets, Store}, Pos) -> ?ets_next(Store, Pos);
 
776
%% retainer_slot({dets, Store}, Pos) -> dets:slot(Store, Pos).
 
777
 
 
778
retainer_fixtable(Tab, Bool) when atom(Tab) ->
 
779
    mnesia_lib:db_fixtable(val({Tab, storage_type}), Tab, Bool);
 
780
retainer_fixtable({ets, Tab}, Bool) ->
 
781
    mnesia_lib:db_fixtable(ram_copies, Tab, Bool);
 
782
retainer_fixtable({dets, Tab}, Bool) ->
 
783
    mnesia_lib:db_fixtable(disc_only_copies, Tab, Bool).
 
784
 
 
785
retainer_delete({ets, Store}) ->
 
786
    ?ets_delete_table(Store);
 
787
retainer_delete({dets, Store}) ->
 
788
    mnesia_lib:dets_sync_close(Store),
 
789
    Fname = tab2retainer(Store),
 
790
    file:delete(Fname).
 
791
 
 
792
retainer_loop(Cp) ->
 
793
    Name = Cp#checkpoint_args.name,
 
794
    receive
 
795
        {_From, {retain, Tid, Tab, Key, OldRecs}}
 
796
        when Cp#checkpoint_args.wait_for_old == [] ->
 
797
            R = val({Tab, {retainer, Name}}),
 
798
            case R#retainer.really_retain of
 
799
                true ->
 
800
                    PendingTab = Cp#checkpoint_args.pending_tab,
 
801
                    case catch ?ets_lookup_element(PendingTab, Tid, 1) of
 
802
                        {'EXIT', _} ->
 
803
                            Store = R#retainer.store,
 
804
                            case retainer_get(Store, Key) of
 
805
                                [] ->
 
806
                                    retainer_put(Store, {Tab, Key, OldRecs});
 
807
                                _ ->
 
808
                                    already_retained
 
809
                            end;
 
810
                        pending ->
 
811
                            ignore
 
812
                    end;
 
813
                false ->
 
814
                    ignore
 
815
            end,
 
816
            retainer_loop(Cp);
 
817
        
 
818
        %% Adm
 
819
        {From, deactivate} ->
 
820
            do_stop(Cp),
 
821
            reply(From, Name, deactivated),
 
822
            unlink(From),
 
823
            exit(shutdown);
 
824
 
 
825
        {'EXIT', Parent, _} when Parent == Cp#checkpoint_args.supervisor ->
 
826
            %% do_stop(Cp),
 
827
            %% assume that entire Mnesia is terminating
 
828
            exit(shutdown);
 
829
 
 
830
        {_From, {mnesia_down, Node}} ->
 
831
            Cp2 = do_del_retainers(Cp, Node),
 
832
            retainer_loop(Cp2);
 
833
        {From, get_checkpoint} ->
 
834
            reply(From, Name, Cp),
 
835
            retainer_loop(Cp);
 
836
        {From, {add_copy, Tab, Node}} when Cp#checkpoint_args.wait_for_old == [] ->
 
837
            {Res, Cp2} = do_add_copy(Cp, Tab, Node),
 
838
            reply(From, Name, Res),
 
839
            retainer_loop(Cp2);
 
840
        {From, {del_copy, Tab, Node}} when Cp#checkpoint_args.wait_for_old == [] ->
 
841
            Cp2 = do_del_copy(Cp, Tab, Node),
 
842
            reply(From, Name, ok),
 
843
            retainer_loop(Cp2);
 
844
        {From, {change_copy, Tab, From, To}} when Cp#checkpoint_args.wait_for_old == [] ->
 
845
            Cp2 = do_change_copy(Cp, Tab, From, To),
 
846
            reply(From, Name, ok),
 
847
            retainer_loop(Cp2);
 
848
        {_From, {add_retainer, R, Node}} ->
 
849
            Cp2 = do_add_retainer(Cp, R, Node),
 
850
            retainer_loop(Cp2);
 
851
        {_From, {del_retainer, R, Node}} when Cp#checkpoint_args.wait_for_old == [] ->
 
852
            Cp2 = do_del_retainer(Cp, R, Node),
 
853
            retainer_loop(Cp2);
 
854
 
 
855
        %% Iteration
 
856
        {From, {iter_begin, Iter}} when Cp#checkpoint_args.wait_for_old == [] ->
 
857
            Cp2 = iter_begin(Cp, From, Iter),
 
858
            retainer_loop(Cp2);
 
859
 
 
860
        {From, {iter_end, Iter}} when Cp#checkpoint_args.wait_for_old == [] ->
 
861
            retainer_fixtable(Iter#iter.oid_tab, false),
 
862
            Iters = Cp#checkpoint_args.iterators -- [Iter],
 
863
            reply(From, Name, ok),
 
864
            retainer_loop(Cp#checkpoint_args{iterators = Iters});       
 
865
 
 
866
        {_From, {exit_pending, Tid}}
 
867
            when list(Cp#checkpoint_args.wait_for_old) ->
 
868
            StillPending = lists:delete(Tid, Cp#checkpoint_args.wait_for_old),
 
869
            Cp2 = Cp#checkpoint_args{wait_for_old = StillPending},
 
870
            Cp3 = maybe_activate(Cp2),
 
871
            retainer_loop(Cp3);
 
872
 
 
873
        {From, collect_pending} ->
 
874
            PendingTab = Cp#checkpoint_args.pending_tab,
 
875
            del(pending_checkpoints, PendingTab),
 
876
            Pending = ?ets_match_object(PendingTab, '_'),
 
877
            reply(From, Name, {ok, Pending}),
 
878
            retainer_loop(Cp);
 
879
 
 
880
        {From, {activate, Pending}} ->
 
881
            StillPending = mnesia_recover:still_pending(Pending),
 
882
            enter_still_pending(StillPending, Cp#checkpoint_args.pending_tab),
 
883
            Cp2 = maybe_activate(Cp#checkpoint_args{wait_for_old = StillPending}),
 
884
            reply(From, Name, activated),
 
885
            retainer_loop(Cp2);
 
886
 
 
887
        {'EXIT', From, _Reason} ->
 
888
            Iters = [Iter || Iter <- Cp#checkpoint_args.iterators,
 
889
                             check_iter(From, Iter)],
 
890
            retainer_loop(Cp#checkpoint_args{iterators = Iters});
 
891
 
 
892
        {system, From, Msg} ->
 
893
            dbg_out("~p got {system, ~p, ~p}~n", [?MODULE, From, Msg]),
 
894
            sys:handle_system_msg(Msg, From, no_parent, ?MODULE, [], Cp)
 
895
    end.
 
896
 
 
897
maybe_activate(Cp)
 
898
        when Cp#checkpoint_args.wait_for_old == [],
 
899
             Cp#checkpoint_args.is_activated == false ->
 
900
    Cp#checkpoint_args{pending_tab = undefined, is_activated = true};
 
901
maybe_activate(Cp) ->
 
902
    Cp.
 
903
 
 
904
iter_begin(Cp, From, Iter) ->
 
905
    Name = Cp#checkpoint_args.name,
 
906
    R = val({Iter#iter.tab_name, {retainer, Name}}),
 
907
    Iter2 = init_tabs(R, Iter),
 
908
    Iter3 = Iter2#iter{pid = From},
 
909
    retainer_fixtable(Iter3#iter.oid_tab, true),
 
910
    Iters = [Iter3 | Cp#checkpoint_args.iterators],
 
911
    reply(From, Name, {ok, Iter3, self()}),
 
912
    Cp#checkpoint_args{iterators = Iters}.
 
913
 
 
914
do_stop(Cp) ->
 
915
    Name = Cp#checkpoint_args.name,
 
916
    del(pending_checkpoints, Cp#checkpoint_args.pending_tab),
 
917
    del(pending_checkpoint_pids, self()),
 
918
    del(checkpoints, Name),
 
919
    unset({checkpoint, Name}),
 
920
    lists:foreach(fun deactivate_tab/1, Cp#checkpoint_args.retainers),
 
921
    Iters = Cp#checkpoint_args.iterators,
 
922
    lists:foreach(fun(I) -> retainer_fixtable(I#iter.oid_tab, false) end, Iters).
 
923
 
 
924
deactivate_tab(R) ->
 
925
    Name = R#retainer.cp_name,
 
926
    Tab = R#retainer.tab_name,
 
927
    del({Tab, checkpoints}, Name),   %% Keep checkpoint info for table_info & mnesia_session 
 
928
    del_chkp_info(Tab, Name),
 
929
    unset({Tab, {retainer, Name}}),
 
930
    Active = lists:member(node(), R#retainer.writers),
 
931
    case R#retainer.store of
 
932
        undefined ->
 
933
            ignore;
 
934
        Store when Active == true ->
 
935
            retainer_delete(Store);
 
936
        _ ->
 
937
            ignore
 
938
    end.
 
939
 
 
940
del_chkp_info(Tab, Name) ->   
 
941
    case val({Tab, commit_work}) of
 
942
        [{checkpoints, ChkList} | Rest] -> 
 
943
            case lists:delete(Name, ChkList) of
 
944
                [] -> 
 
945
                    %% The only checkpoint was deleted
 
946
                    mnesia_lib:set({Tab, commit_work}, Rest);
 
947
                NewList ->
 
948
                    mnesia_lib:set({Tab, commit_work}, 
 
949
                                   [{checkpoints, NewList} | Rest])
 
950
            end;
 
951
        _  -> ignore
 
952
    end.
 
953
 
 
954
do_del_retainers(Cp, Node) ->
 
955
    Rs = [do_del_retainer2(Cp, R, Node) || R <- Cp#checkpoint_args.retainers],
 
956
    Cp#checkpoint_args{retainers = Rs, nodes = writers(Rs)}.
 
957
 
 
958
do_del_retainer2(Cp, R, Node) ->
 
959
    Writers = R#retainer.writers -- [Node],
 
960
    R2 = R#retainer{writers = Writers},
 
961
    set({R2#retainer.tab_name, {retainer, R2#retainer.cp_name}}, R2),
 
962
    if
 
963
        Writers == [] ->
 
964
            Event = {mnesia_checkpoint_deactivated, Cp#checkpoint_args.name},
 
965
            mnesia_lib:report_system_event(Event),
 
966
            do_stop(Cp),
 
967
            exit(shutdown);
 
968
        Node == node() ->
 
969
            deactivate_tab(R), % Avoids unnecessary tm_retain accesses
 
970
            set({R2#retainer.tab_name, {retainer, R2#retainer.cp_name}}, R2),
 
971
            R2; 
 
972
        true ->
 
973
            R2
 
974
    end.
 
975
 
 
976
do_del_retainer(Cp, R0, Node) ->
 
977
    {R, Rest} = find_retainer(R0, Cp#checkpoint_args.retainers, []),
 
978
    R2 = do_del_retainer2(Cp, R, Node),
 
979
    Rs = [R2|Rest],
 
980
    Cp#checkpoint_args{retainers = Rs, nodes = writers(Rs)}.
 
981
 
 
982
do_del_copy(Cp, Tab, ThisNode) when ThisNode == node() ->
 
983
    Name = Cp#checkpoint_args.name,
 
984
    Others = Cp#checkpoint_args.nodes -- [ThisNode],
 
985
    R = val({Tab, {retainer, Name}}),
 
986
    abcast(Others, Name, {del_retainer, R, ThisNode}),
 
987
    do_del_retainer(Cp, R, ThisNode).
 
988
 
 
989
do_add_copy(Cp, Tab, Node) when Node /= node()->
 
990
    case lists:member(Tab, Cp#checkpoint_args.max) of
 
991
        false ->
 
992
            {ok, Cp};
 
993
        true ->
 
994
            Name = Cp#checkpoint_args.name,
 
995
            R0 = val({Tab, {retainer, Name}}),
 
996
            W = R0#retainer.writers,
 
997
            R = R0#retainer{writers = W ++ [Node]},
 
998
 
 
999
            case lists:member(Node, Cp#checkpoint_args.nodes) of
 
1000
                true ->
 
1001
                    send_retainer(Cp, R, Node);
 
1002
                false ->
 
1003
                    case tm_remote_prepare(Node, Cp) of
 
1004
                        {ok, Name, _IgnoreNew, Node} ->
 
1005
                            case lists:member(schema, Cp#checkpoint_args.max) of
 
1006
                                true ->
 
1007
                                    %% We need to send schema retainer somewhere
 
1008
                                    RS0 = val({schema, {retainer, Name}}),
 
1009
                                    W = RS0#retainer.writers,
 
1010
                                    RS1 = RS0#retainer{writers = W ++ [Node]},
 
1011
                                    case send_retainer(Cp, RS1, Node) of
 
1012
                                        {ok, Cp1} ->
 
1013
                                            send_retainer(Cp1, R, Node);
 
1014
                                        Error ->
 
1015
                                            Error
 
1016
                                    end;
 
1017
                                false ->
 
1018
                                    send_retainer(Cp, R, Node)
 
1019
                            end;
 
1020
                        {badrpc, Reason} ->
 
1021
                            {{error, {badrpc, Reason}}, Cp};
 
1022
                        {error, Reason} ->
 
1023
                            {{error, Reason}, Cp}
 
1024
                    end
 
1025
            end
 
1026
    end.
 
1027
 
 
1028
tm_remote_prepare(Node, Cp) ->
 
1029
    rpc:call(Node, ?MODULE, tm_prepare, [Cp]).
 
1030
  
 
1031
do_add_retainer(Cp, R0, Node) ->
 
1032
    Writers = R0#retainer.writers,
 
1033
    {R, Rest} = find_retainer(R0, Cp#checkpoint_args.retainers, []),    
 
1034
    NewRet = 
 
1035
        if 
 
1036
            Node == node() ->
 
1037
                prepare_tab(Cp, R#retainer{writers = Writers});
 
1038
            true -> 
 
1039
                R#retainer{writers = Writers}
 
1040
        end,
 
1041
    Rs = [NewRet | Rest],
 
1042
    set({NewRet#retainer.tab_name, {retainer, NewRet#retainer.cp_name}}, NewRet),
 
1043
    Cp#checkpoint_args{retainers = Rs, nodes = writers(Rs)}.
 
1044
 
 
1045
find_retainer(#retainer{cp_name = CP, tab_name = Tab}, 
 
1046
              [Ret = #retainer{cp_name = CP, tab_name = Tab} | R], Acc) ->
 
1047
    {Ret, R ++ Acc};
 
1048
find_retainer(Ret, [H|R], Acc) ->
 
1049
    find_retainer(Ret, R, [H|Acc]).
 
1050
 
 
1051
send_retainer(Cp, R, Node) ->
 
1052
    Name = Cp#checkpoint_args.name,
 
1053
    Nodes0 = Cp#checkpoint_args.nodes -- [Node],
 
1054
    Nodes1 = Nodes0 ++ [Node],
 
1055
    Nodes = Nodes1 -- [node()],
 
1056
    abcast(Nodes, Name, {add_retainer, R, Node}),
 
1057
    Store = R#retainer.store,
 
1058
%%    send_retainer2(Node, Name, Store, retainer_next_slot(Store, 0)),
 
1059
    send_retainer2(Node, Name, Store, retainer_first(Store)),
 
1060
    Cp2 = do_add_retainer(Cp, R, Node),
 
1061
    {ok, Cp2}.
 
1062
 
 
1063
send_retainer2(_, _, _, '$end_of_table') ->
 
1064
    ok;
 
1065
%%send_retainer2(Node, Name, Store, {Slot, Records}) ->
 
1066
send_retainer2(Node, Name, Store, Key) ->
 
1067
    [{Tab, _, Records}] = retainer_get(Store, Key),
 
1068
    abcast([Node], Name, {retain, {dirty, send_retainer}, Tab, Key, Records}),
 
1069
    send_retainer2(Node, Name, Store, retainer_next(Store, Key)).
 
1070
 
 
1071
do_change_copy(Cp, Tab, FromType, ToType) ->
 
1072
    Name = Cp#checkpoint_args.name,
 
1073
    R = val({Tab, {retainer, Name}}),
 
1074
    R2 = prepare_tab(Cp, R, ToType),
 
1075
    {_, Old} = R#retainer.store,
 
1076
    {_, New} = R2#retainer.store,
 
1077
 
 
1078
    Fname = tab2retainer({Tab, Name}),
 
1079
    if
 
1080
        FromType == disc_only_copies ->
 
1081
            mnesia_lib:dets_sync_close(Old),
 
1082
            loaded = mnesia_lib:dets_to_ets(Old, New, Fname, set, no, yes),
 
1083
            ok = file:delete(Fname);
 
1084
        ToType == disc_only_copies ->
 
1085
            TabSize = ?ets_info(Old, size),
 
1086
            Props = [{file, Fname},
 
1087
                     {type, set},
 
1088
                     {keypos, 2},
 
1089
%%                   {ram_file, true},
 
1090
                     {estimated_no_objects, TabSize + 256},
 
1091
                     {repair, false}],
 
1092
            {ok, _} = mnesia_lib:dets_sync_open(New, Props),
 
1093
            ok = mnesia_dumper:raw_dump_table(New, Old),
 
1094
            ?ets_delete_table(Old);
 
1095
        true ->
 
1096
            ignore
 
1097
    end,
 
1098
    Pos = #retainer.tab_name,
 
1099
    Rs = lists:keyreplace(Tab, Pos, Cp#checkpoint_args.retainers, R2),
 
1100
    Cp#checkpoint_args{retainers = Rs, nodes = writers(Rs)}.
 
1101
 
 
1102
check_iter(From, Iter) when Iter#iter.pid == From ->
 
1103
    retainer_fixtable(Iter#iter.oid_tab, false),
 
1104
    false;
 
1105
check_iter(_From, _Iter) ->
 
1106
    true.
 
1107
 
 
1108
init_tabs(R, Iter) ->
 
1109
    {Kind, _} = Store = R#retainer.store,
 
1110
    Main = {Kind, Iter#iter.tab_name},
 
1111
    Ret = Store,
 
1112
    Iter2 = Iter#iter{main_tab = Main, retainer_tab = Ret},
 
1113
    case Iter#iter.source of
 
1114
        table -> Iter2#iter{oid_tab = Main};
 
1115
        retainer -> Iter2#iter{oid_tab = Ret}
 
1116
    end.
 
1117
 
 
1118
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
 
1119
%% Iteration
 
1120
%%
 
1121
%% Iterates over a table and applies Fun(ListOfRecords)
 
1122
%% with a suitable amount of records, e.g. 1000 or so.
 
1123
%% ListOfRecords is [] when the iteration is over.
 
1124
%%
 
1125
%% OidKind affects which internal table to be iterated over and
 
1126
%% ValKind affects which table to pick the actual records from. Legal
 
1127
%% values for OidKind and ValKind is the atom table or the atom
 
1128
%% retainer.
 
1129
%%
 
1130
%% The iteration may either be performed over the main table (which
 
1131
%% contains the latest values of the records, i.e. the values that
 
1132
%% are visible to the applications) or over the checkpoint retainer
 
1133
%% (which contains the values as the looked like the timepoint when
 
1134
%% the checkpoint was activated).
 
1135
%%
 
1136
%% It is possible to iterate over the main table and pick values
 
1137
%% from the retainer and vice versa.
 
1138
 
 
1139
iterate(Name, Tab, Fun, Acc, Source, Val) ->
 
1140
    Iter0 = #iter{tab_name = Tab, source = Source, val = Val},
 
1141
    case call(Name, {iter_begin, Iter0}) of
 
1142
        {error, Reason} ->
 
1143
            {error, Reason};
 
1144
        {ok, Iter, Pid} ->
 
1145
            link(Pid), % We don't want any pending fixtable's
 
1146
            Res = (catch iter(Fun, Acc, Iter)),
 
1147
            unlink(Pid),
 
1148
            call(Name, {iter_end, Iter}),
 
1149
            case Res of
 
1150
                {'EXIT', Reason} -> {error, Reason};
 
1151
                {error, Reason} -> {error, Reason};
 
1152
                Acc2 -> {ok, Acc2}
 
1153
            end
 
1154
    end.
 
1155
 
 
1156
iter(Fun, Acc, Iter)->
 
1157
    iter(Fun, Acc, Iter, retainer_first(Iter#iter.oid_tab)).
 
1158
 
 
1159
iter(Fun, Acc, Iter, Key) ->
 
1160
    case get_records(Iter, Key) of
 
1161
        {'$end_of_table', []} ->
 
1162
            Fun([], Acc);
 
1163
        {'$end_of_table', Records} ->
 
1164
            Acc2 = Fun(Records, Acc),
 
1165
            Fun([], Acc2);
 
1166
        {Next, Records} ->
 
1167
            Acc2 = Fun(Records, Acc),
 
1168
            iter(Fun, Acc2, Iter, Next)
 
1169
    end.
 
1170
 
 
1171
stop_iteration(Reason) ->
 
1172
    throw({error, {stopped, Reason}}).
 
1173
 
 
1174
get_records(Iter, Key) ->
 
1175
    get_records(Iter, Key, 500, []). % 500 keys
 
1176
 
 
1177
get_records(_Iter, Key, 0, Acc) ->
 
1178
    {Key, lists:append(lists:reverse(Acc))};
 
1179
get_records(_Iter, '$end_of_table', _I, Acc) ->
 
1180
    {'$end_of_table', lists:append(lists:reverse(Acc))};
 
1181
get_records(Iter, Key, I, Acc) ->
 
1182
    Recs = get_val(Iter, Key),
 
1183
    Next = retainer_next(Iter#iter.oid_tab, Key),
 
1184
    get_records(Iter, Next, I-1, [Recs | Acc]).
 
1185
 
 
1186
get_val(Iter, Key) when Iter#iter.val == latest ->
 
1187
    get_latest_val(Iter, Key);
 
1188
get_val(Iter, Key) when Iter#iter.val == checkpoint ->
 
1189
    get_checkpoint_val(Iter, Key).
 
1190
 
 
1191
get_latest_val(Iter, Key) when Iter#iter.source == table ->
 
1192
    retainer_get(Iter#iter.main_tab, Key);
 
1193
get_latest_val(Iter, Key) when Iter#iter.source == retainer ->
 
1194
    DeleteOid = {Iter#iter.tab_name, Key},
 
1195
    [DeleteOid | retainer_get(Iter#iter.main_tab, Key)].
 
1196
 
 
1197
get_checkpoint_val(Iter, Key) when Iter#iter.source == table ->
 
1198
    retainer_get(Iter#iter.main_tab, Key);
 
1199
get_checkpoint_val(Iter, Key) when Iter#iter.source == retainer ->
 
1200
    DeleteOid = {Iter#iter.tab_name, Key},
 
1201
    case retainer_get(Iter#iter.retainer_tab, Key) of
 
1202
        [{_, _, []}] -> [DeleteOid];
 
1203
        [{_, _, Records}] -> [DeleteOid | Records]
 
1204
    end.
 
1205
 
 
1206
%%%%%%%%%%%%%%%%%%%%%%%%%%%
 
1207
%% System upgrade
 
1208
 
 
1209
system_continue(_Parent, _Debug, Cp) ->
 
1210
    retainer_loop(Cp).
 
1211
 
 
1212
system_terminate(_Reason, _Parent,_Debug, Cp) ->
 
1213
    do_stop(Cp).
 
1214
 
 
1215
system_code_change(Cp, _Module, _OldVsn, _Extra) ->
 
1216
    {ok, Cp}.
 
1217
 
 
1218
convert_cp_record(Cp) when record(Cp, checkpoint) ->
 
1219
    ROD = 
 
1220
        case Cp#checkpoint.ram_overrides_dump of
 
1221
            true -> Cp#checkpoint.min ++ Cp#checkpoint.max;
 
1222
            false -> []
 
1223
        end,
 
1224
 
 
1225
    {ok, #checkpoint_args{name = Cp#checkpoint.name,
 
1226
                          allow_remote = Cp#checkpoint.name,
 
1227
                          ram_overrides_dump = ROD,
 
1228
                          nodes = Cp#checkpoint.nodes,
 
1229
                          node = Cp#checkpoint.node,
 
1230
                          now = Cp#checkpoint.now,
 
1231
                          cookie = ?unique_cookie,
 
1232
                          min = Cp#checkpoint.min,
 
1233
                          max = Cp#checkpoint.max,
 
1234
                          pending_tab = Cp#checkpoint.pending_tab,
 
1235
                          wait_for_old = Cp#checkpoint.wait_for_old,
 
1236
                          is_activated = Cp#checkpoint.is_activated,
 
1237
                          ignore_new = Cp#checkpoint.ignore_new,
 
1238
                          retainers = Cp#checkpoint.retainers,
 
1239
                          iterators = Cp#checkpoint.iterators,
 
1240
                          supervisor = Cp#checkpoint.supervisor,
 
1241
                          pid = Cp#checkpoint.pid
 
1242
                         }};
 
1243
convert_cp_record(Cp) when record(Cp, checkpoint_args) ->
 
1244
    AllTabs = Cp#checkpoint_args.min ++ Cp#checkpoint_args.max,
 
1245
    ROD = case Cp#checkpoint_args.ram_overrides_dump of
 
1246
              [] -> 
 
1247
                  false;
 
1248
              AllTabs -> 
 
1249
                  true;
 
1250
              _ -> 
 
1251
                  error
 
1252
          end,
 
1253
    if
 
1254
        ROD == error ->
 
1255
            {error, {"Old node cannot handle new checkpoint protocol",
 
1256
                     ram_overrides_dump}};
 
1257
        true ->
 
1258
            {ok, #checkpoint{name = Cp#checkpoint_args.name,
 
1259
                             allow_remote = Cp#checkpoint_args.name,
 
1260
                             ram_overrides_dump = ROD,
 
1261
                             nodes = Cp#checkpoint_args.nodes,
 
1262
                             node = Cp#checkpoint_args.node,
 
1263
                             now = Cp#checkpoint_args.now,
 
1264
                             min = Cp#checkpoint_args.min,
 
1265
                             max = Cp#checkpoint_args.max,
 
1266
                             pending_tab = Cp#checkpoint_args.pending_tab,
 
1267
                             wait_for_old = Cp#checkpoint_args.wait_for_old,
 
1268
                             is_activated = Cp#checkpoint_args.is_activated,
 
1269
                             ignore_new = Cp#checkpoint_args.ignore_new,
 
1270
                             retainers = Cp#checkpoint_args.retainers,
 
1271
                             iterators = Cp#checkpoint_args.iterators,
 
1272
                             supervisor = Cp#checkpoint_args.supervisor,
 
1273
                             pid = Cp#checkpoint_args.pid
 
1274
                            }}
 
1275
    end.
 
1276
 
 
1277
%%%%%%%%%%%%%%%%%%%%%%%%%%
 
1278
 
 
1279
val(Var) ->
 
1280
    case ?catch_val(Var) of
 
1281
        {'EXIT', _ReASoN_} -> mnesia_lib:other_val(Var, _ReASoN_); 
 
1282
        _VaLuE_ -> _VaLuE_ 
 
1283
    end.
 
1284