1
%% ``The contents of this file are subject to the Erlang Public License,
2
%% Version 1.1, (the "License"); you may not use this file except in
3
%% compliance with the License. You should have received a copy of the
4
%% Erlang Public License along with this software. If not, it can be
5
%% retrieved via the world wide web at http://www.erlang.org/.
7
%% Software distributed under the License is distributed on an "AS IS"
8
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
9
%% the License for the specific language governing rights and limitations
12
%% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
13
%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
14
%% AB. All Rights Reserved.''
16
%% $Id: mnesia_locker.erl,v 1.2 2009/07/01 15:45:40 kostis Exp $
18
-module(mnesia_locker).
30
receive_release_tid_acc/2,
43
%% sys callback functions
44
-export([system_continue/3,
49
-include("mnesia.hrl").
50
-import(mnesia_lib, [dbg_out/2, error/2, verbose/2]).
52
-define(dbg(S,V), ok).
53
%-define(dbg(S,V), dbg_out("~p:~p: " ++ S, [?MODULE, ?LINE] ++ V)).
55
-define(ALL, '______WHOLETABLE_____').
56
-define(STICK, '______STICK_____').
57
-define(GLOBAL, '______GLOBAL_____').
59
-record(state, {supervisor}).
61
-record(queue, {oid, tid, op, pid, lucky}).
63
%% mnesia_held_locks: contain {Oid, Op, Tid} entries (bag)
64
-define(match_oid_held_locks(Oid), {Oid, '_', '_'}).
65
%% mnesia_tid_locks: contain {Tid, Oid, Op} entries (bag)
66
-define(match_oid_tid_locks(Tid), {Tid, '_', '_'}).
67
%% mnesia_sticky_locks: contain {Oid, Node} entries and {Tab, Node} entries (set)
68
-define(match_oid_sticky_locks(Oid),{Oid, '_'}).
69
%% mnesia_lock_queue: contain {queue, Oid, Tid, Op, ReplyTo, WaitForTid} entries (ordered_set)
70
-define(match_oid_lock_queue(Oid), #queue{oid=Oid, tid='_', op = '_', pid = '_', lucky = '_'}).
71
%% mnesia_lock_counter: {{write, Tab}, Number} &&
72
%% {{read, Tab}, Number} entries (set)
75
mnesia_monitor:start_proc(?MODULE, ?MODULE, init, [self()]).
78
register(?MODULE, self()),
79
process_flag(trap_exit, true),
80
proc_lib:init_ack(Parent, {ok, self()}),
81
loop(#state{supervisor = Parent}).
84
case ?catch_val(Var) of
85
{'EXIT', _ReASoN_} -> mnesia_lib:other_val(Var, _ReASoN_);
90
From ! {?MODULE, node(), R}.
92
l_request(Node, X, Store) ->
93
{?MODULE, Node} ! {self(), X},
94
l_req_rec(Node, Store).
96
l_req_rec(Node, Store) ->
97
?ets_insert(Store, {nodes, Node}),
99
{?MODULE, Node, {switch, Node2, Req}} ->
100
?ets_insert(Store, {nodes, Node2}),
101
{?MODULE, Node2} ! Req,
102
{switch, Node2, Req};
103
{?MODULE, Node, Reply} ->
105
{mnesia_down, Node} ->
106
{not_granted, {node_not_running, Node}}
110
?MODULE ! {release_tid, Tid}.
112
async_release_tid(Nodes, Tid) ->
113
rpc:abcast(Nodes, ?MODULE, {release_tid, Tid}).
115
send_release_tid(Nodes, Tid) ->
116
rpc:abcast(Nodes, ?MODULE, {self(), {sync_release_tid, Tid}}).
118
receive_release_tid_acc([Node | Nodes], Tid) ->
120
{?MODULE, Node, {tid_released, Tid}} ->
121
receive_release_tid_acc(Nodes, Tid);
122
{mnesia_down, Node} ->
123
receive_release_tid_acc(Nodes, Tid)
125
receive_release_tid_acc([], _Tid) ->
130
{From, {write, Tid, Oid}} ->
131
try_sticky_lock(Tid, write, From, Oid),
134
%% If Key == ?ALL it's a request to lock the entire table
137
{From, {read, Tid, Oid}} ->
138
try_sticky_lock(Tid, read, From, Oid),
141
%% Really do a read, but get hold of a write lock
142
%% used by mnesia:wread(Oid).
144
{From, {read_write, Tid, Oid}} ->
145
try_sticky_lock(Tid, read_write, From, Oid),
148
%% Tid has somehow terminated, clear up everything
149
%% and pass locks on to queued processes.
150
%% This is the purpose of the mnesia_tid_locks table
152
{release_tid, Tid} ->
156
%% stick lock, first tries this to the where_to_read Node
157
{From, {test_set_sticky, Tid, {Tab, _} = Oid, Lock}} ->
158
case ?ets_lookup(mnesia_sticky_locks, Tab) of
160
reply(From, not_stuck),
162
[{_,Node}] when Node == node() ->
163
%% Lock is stuck here, see now if we can just set
164
%% a regular write lock
165
try_lock(Tid, Lock, From, Oid),
168
reply(From, {stuck_elsewhere, Node}),
172
%% If test_set_sticky fails, we send this to all nodes
173
%% after aquiring a real write lock on Oid
175
{stick, {Tab, _}, N} ->
176
?ets_insert(mnesia_sticky_locks, {Tab, N}),
179
%% The caller which sends this message, must have first
180
%% aquired a write lock on the entire table
182
?ets_delete(mnesia_sticky_locks, Tab),
185
{From, {ix_read, Tid, Tab, IxKey, Pos}} ->
186
case catch mnesia_index:get_index_table(Tab, Pos) of
188
reply(From, {not_granted, {no_exists, Tab, {index, [Pos]}}}),
191
Rk = mnesia_lib:elems(2,mnesia_index:db_get(Index, IxKey)),
193
case ?ets_lookup(mnesia_sticky_locks, Tab) of
195
set_read_lock_on_all_keys(Tid, From,Tab,Rk,Rk,
198
[{_,N}] when N == node() ->
199
set_read_lock_on_all_keys(Tid, From,Tab,Rk,Rk,
203
Req = {From, {ix_read, Tid, Tab, IxKey, Pos}},
204
From ! {?MODULE, node(), {switch, N, Req}},
209
{From, {sync_release_tid, Tid}} ->
211
reply(From, {tid_released, Tid}),
214
{release_remote_non_pending, Node, Pending} ->
215
release_remote_non_pending(Node, Pending),
216
mnesia_monitor:mnesia_down(?MODULE, Node),
219
{'EXIT', Pid, _} when Pid == State#state.supervisor ->
222
{system, From, Msg} ->
223
verbose("~p got {system, ~p, ~p}~n", [?MODULE, From, Msg]),
224
Parent = State#state.supervisor,
225
sys:handle_system_msg(Msg, From, Parent, ?MODULE, [], State);
228
error("~p got unexpected message: ~p~n", [?MODULE, Msg]),
232
set_lock(Tid, Oid, Op) ->
233
?dbg("Granted ~p ~p ~p~n", [Tid,Oid,Op]),
234
?ets_insert(mnesia_held_locks, {Oid, Op, Tid}),
235
?ets_insert(mnesia_tid_locks, {Tid, Oid, Op}).
237
%%%%%%%%%%%%%%%%%%%%%%%%%%%
240
try_sticky_lock(Tid, Op, Pid, {Tab, _} = Oid) ->
241
case ?ets_lookup(mnesia_sticky_locks, Tab) of
243
try_lock(Tid, Op, Pid, Oid);
244
[{_,N}] when N == node() ->
245
try_lock(Tid, Op, Pid, Oid);
247
Req = {Pid, {Op, Tid, Oid}},
248
Pid ! {?MODULE, node(), {switch, N, Req}}
251
try_lock(Tid, read_write, Pid, Oid) ->
252
try_lock(Tid, read_write, read, write, Pid, Oid);
253
try_lock(Tid, Op, Pid, Oid) ->
254
try_lock(Tid, Op, Op, Op, Pid, Oid).
256
try_lock(Tid, Op, SimpleOp, Lock, Pid, Oid) ->
257
case can_lock(Tid, Lock, Oid, {no, bad_luck}) of
259
Reply = grant_lock(Tid, SimpleOp, Lock, Oid),
262
C = #cyclic{op = SimpleOp, lock = Lock, oid = Oid, lucky = Lucky},
263
?dbg("Rejected ~p ~p ~p ~p ~n", [Tid, Oid, Lock, Lucky]),
264
reply(Pid, {not_granted, C});
266
?dbg("Queued ~p ~p ~p ~p ~n", [Tid, Oid, Lock, Lucky]),
267
%% Append to queue: Nice place for trace output
268
?ets_insert(mnesia_lock_queue,
269
#queue{oid = Oid, tid = Tid, op = Op,
270
pid = Pid, lucky = Lucky}),
271
?ets_insert(mnesia_tid_locks, {Tid, Oid, {queued, Op}})
274
grant_lock(Tid, read, Lock, {Tab, Key})
275
when Key /= ?ALL, Tab /= ?GLOBAL ->
276
case node(Tid#tid.pid) == node() of
278
set_lock(Tid, {Tab, Key}, Lock),
279
{granted, lookup_in_client};
281
case catch mnesia_lib:db_get(Tab, Key) of %% lookup as well
283
%% Table has been deleted from this node,
284
%% restart the transaction.
285
C = #cyclic{op = read, lock = Lock, oid = {Tab, Key},
289
set_lock(Tid, {Tab, Key}, Lock),
293
grant_lock(Tid, read, Lock, Oid) ->
294
set_lock(Tid, Oid, Lock),
296
grant_lock(Tid, write, Lock, Oid) ->
297
set_lock(Tid, Oid, Lock),
300
%% 1) Impose an ordering on all transactions favour old (low tid) transactions
301
%% newer (higher tid) transactions may never wait on older ones,
302
%% 2) When releasing the tids from the queue always begin with youngest (high tid)
303
%% because of 1) it will avoid the deadlocks.
304
%% 3) TabLocks is the problem :-) They should not starve and not deadlock
305
%% handle tablocks in queue as they had locks on unlocked records.
307
can_lock(Tid, read, {Tab, Key}, AlreadyQ) when Key /= ?ALL ->
308
%% The key is bound, no need for the other BIF
310
ObjLocks = ?ets_match_object(mnesia_held_locks, {Oid, write, '_'}),
311
TabLocks = ?ets_match_object(mnesia_held_locks, {{Tab, ?ALL}, write, '_'}),
312
check_lock(Tid, Oid, ObjLocks, TabLocks, yes, AlreadyQ, read);
314
can_lock(Tid, read, Oid, AlreadyQ) -> % Whole tab
315
Tab = element(1, Oid),
316
ObjLocks = ?ets_match_object(mnesia_held_locks, {{Tab, '_'}, write, '_'}),
317
check_lock(Tid, Oid, ObjLocks, [], yes, AlreadyQ, read);
319
can_lock(Tid, write, {Tab, Key}, AlreadyQ) when Key /= ?ALL ->
321
ObjLocks = ?ets_lookup(mnesia_held_locks, Oid),
322
TabLocks = ?ets_lookup(mnesia_held_locks, {Tab, ?ALL}),
323
check_lock(Tid, Oid, ObjLocks, TabLocks, yes, AlreadyQ, write);
325
can_lock(Tid, write, Oid, AlreadyQ) -> % Whole tab
326
Tab = element(1, Oid),
327
ObjLocks = ?ets_match_object(mnesia_held_locks, ?match_oid_held_locks({Tab, '_'})),
328
check_lock(Tid, Oid, ObjLocks, [], yes, AlreadyQ, write).
330
%% Check held locks for conflicting locks
331
check_lock(Tid, Oid, [Lock | Locks], TabLocks, X, AlreadyQ, Type) ->
332
case element(3, Lock) of
334
check_lock(Tid, Oid, Locks, TabLocks, X, AlreadyQ, Type);
335
WaitForTid when WaitForTid > Tid -> % Important order
336
check_lock(Tid, Oid, Locks, TabLocks, {queue, WaitForTid}, AlreadyQ, Type);
337
WaitForTid when Tid#tid.pid == WaitForTid#tid.pid ->
338
dbg_out("Spurious lock conflict ~w ~w: ~w -> ~w~n",
339
[Oid, Lock, Tid, WaitForTid]),
340
%% check_lock(Tid, Oid, Locks, TabLocks, {queue, WaitForTid}, AlreadyQ);
341
%% BUGBUG Fix this if possible
347
check_lock(_, _, [], [], X, {queue, bad_luck}, _) ->
348
X; %% The queue should be correct already no need to check it again
350
check_lock(_, _, [], [], X = {queue, _Tid}, _AlreadyQ, _) ->
353
check_lock(Tid, Oid, [], [], X, AlreadyQ, Type) ->
357
check_queue(Tid, Tab, X, AlreadyQ);
359
%% hmm should be solvable by a clever select expr but not today...
360
check_queue(Tid, Tab, X, AlreadyQ);
362
%% If there is a queue on that object, read_lock shouldn't be granted
363
ObjLocks = ets:lookup(mnesia_lock_queue, Oid),
364
Greatest = max(ObjLocks),
367
check_queue(Tid, Tab, X, AlreadyQ);
368
ObjL when Tid > ObjL ->
369
{no, ObjL}; %% Starvation Preemption (write waits for read)
371
check_queue(Tid, Tab, {queue, ObjL}, AlreadyQ)
375
check_lock(Tid, Oid, [], TabLocks, X, AlreadyQ, Type) ->
376
check_lock(Tid, Oid, TabLocks, [], X, AlreadyQ, Type).
378
%% Check queue for conflicting locks
379
%% Assume that all queued locks belongs to other tid's
381
check_queue(Tid, Tab, X, AlreadyQ) ->
382
TabLocks = ets:lookup(mnesia_lock_queue, {Tab,?ALL}),
383
Greatest = max(TabLocks),
389
WaitForTid when WaitForTid#queue.tid > Tid -> % Important order
393
{no, bad_luck} -> {no, WaitForTid};
395
erlang:error({mnesia_locker, assert, AlreadyQ})
404
max([H|R], Tid) when H#queue.tid > Tid ->
411
%% We can't queue the ixlock requests since it
412
%% becomes to complivated for little me :-)
413
%% If we encounter an object with a wlock we reject the
414
%% entire lock request
416
%% BUGBUG: this is actually a bug since we may starve
418
set_read_lock_on_all_keys(Tid, From, Tab, [RealKey | Tail], Orig, Ack) ->
419
Oid = {Tab, RealKey},
420
case can_lock(Tid, read, Oid, {no, bad_luck}) of
422
{granted, Val} = grant_lock(Tid, read, read, Oid),
423
case opt_lookup_in_client(Val, Oid, read) of % Ought to be invoked
424
C when record(C, cyclic) -> % in the client
425
reply(From, {not_granted, C});
427
Ack2 = lists:append(Val2, Ack),
428
set_read_lock_on_all_keys(Tid, From, Tab, Tail, Orig, Ack2)
431
C = #cyclic{op = read, lock = read, oid = Oid, lucky = Lucky},
432
reply(From, {not_granted, C});
434
C = #cyclic{op = read, lock = read, oid = Oid, lucky = Lucky},
435
reply(From, {not_granted, C})
437
set_read_lock_on_all_keys(_Tid, From, _Tab, [], Orig, Ack) ->
438
reply(From, {granted, Ack, Orig}).
440
%%%%%%%%%%%%%%%%%%%%%%%%%%%
443
%% Release remote non-pending nodes
444
release_remote_non_pending(Node, Pending) ->
445
%% Clear the mnesia_sticky_locks table first, to avoid
446
%% unnecessary requests to the failing node
447
?ets_match_delete(mnesia_sticky_locks, {'_' , Node}),
449
%% Then we have to release all locks held by processes
450
%% running at the failed node and also simply remove all
451
%% queue'd requests back to the failed node
453
AllTids = ?ets_match(mnesia_tid_locks, {'$1', '_', '_'}),
454
Tids = [T || [T] <- AllTids, Node == node(T#tid.pid), not lists:member(T, Pending)],
455
do_release_tids(Tids).
457
do_release_tids([Tid | Tids]) ->
459
do_release_tids(Tids);
460
do_release_tids([]) ->
463
do_release_tid(Tid) ->
464
Locks = ?ets_lookup(mnesia_tid_locks, Tid),
465
?dbg("Release ~p ~p ~n", [Tid, Locks]),
466
?ets_delete(mnesia_tid_locks, Tid),
467
release_locks(Locks),
468
%% Removed queued locks which has had locks
469
UniqueLocks = keyunique(lists:sort(Locks),[]),
470
rearrange_queue(UniqueLocks).
472
keyunique([{_Tid, Oid, _Op}|R], Acc = [{_, Oid, _}|_]) ->
474
keyunique([H|R], Acc) ->
475
keyunique(R, [H|Acc]);
476
keyunique([], Acc) ->
479
release_locks([Lock | Locks]) ->
481
release_locks(Locks);
485
release_lock({Tid, Oid, {queued, _}}) ->
486
?ets_match_delete(mnesia_lock_queue,
487
#queue{oid=Oid, tid = Tid, op = '_',
488
pid = '_', lucky = '_'});
489
release_lock({Tid, Oid, Op}) ->
492
?ets_delete(mnesia_held_locks, Oid);
494
?ets_match_delete(mnesia_held_locks, {Oid, Op, Tid})
497
rearrange_queue([{_Tid, {Tab, Key}, _} | Locks]) ->
501
ets:lookup(mnesia_lock_queue, {Tab, ?ALL}) ++
502
ets:lookup(mnesia_lock_queue, {Tab, Key}),
507
Sorted = lists:reverse(lists:keysort(#queue.tid, Queue)),
508
try_waiters_obj(Sorted)
511
Pat = ?match_oid_lock_queue({Tab, '_'}),
512
Queue = ?ets_match_object(mnesia_lock_queue, Pat),
513
Sorted = lists:reverse(lists:keysort(#queue.tid, Queue)),
514
try_waiters_tab(Sorted)
516
?dbg("RearrQ ~p~n", [Queue]),
517
rearrange_queue(Locks);
518
rearrange_queue([]) ->
521
try_waiters_obj([W | Waiters]) ->
522
case try_waiter(W) of
526
try_waiters_obj(Waiters)
528
try_waiters_obj([]) ->
531
try_waiters_tab([W | Waiters]) ->
534
case try_waiter(W) of
538
try_waiters_tab(Waiters)
541
case try_waiter(W) of
543
Rest = key_delete_all(Oid, #queue.oid, Waiters),
544
try_waiters_tab(Rest);
546
try_waiters_tab(Waiters)
549
try_waiters_tab([]) ->
552
try_waiter({queue, Oid, Tid, read_write, ReplyTo, _}) ->
553
try_waiter(Oid, read_write, read, write, ReplyTo, Tid);
554
try_waiter({queue, Oid, Tid, Op, ReplyTo, _}) ->
555
try_waiter(Oid, Op, Op, Op, ReplyTo, Tid).
557
try_waiter(Oid, Op, SimpleOp, Lock, ReplyTo, Tid) ->
558
case can_lock(Tid, Lock, Oid, {queue, bad_luck}) of
560
%% Delete from queue: Nice place for trace output
561
?ets_match_delete(mnesia_lock_queue,
562
#queue{oid=Oid, tid = Tid, op = Op,
563
pid = ReplyTo, lucky = '_'}),
564
Reply = grant_lock(Tid, SimpleOp, Lock, Oid),
565
ReplyTo ! {?MODULE, node(), Reply},
568
?dbg("Keep ~p ~p ~p ~p~n", [Tid, Oid, Lock, _Why]),
569
queued; % Keep waiter in queue
571
C = #cyclic{op = SimpleOp, lock = Lock, oid = Oid, lucky = Lucky},
572
verbose("** WARNING ** Restarted transaction, possible deadlock in lock queue ~w: cyclic = ~w~n",
574
?ets_match_delete(mnesia_lock_queue,
575
#queue{oid=Oid, tid = Tid, op = Op,
576
pid = ReplyTo, lucky = '_'}),
577
Reply = {not_granted, C},
578
ReplyTo ! {?MODULE, node(), Reply},
582
key_delete_all(Key, Pos, TupleList) ->
583
key_delete_all(Key, Pos, TupleList, []).
584
key_delete_all(Key, Pos, [H|T], Ack) when element(Pos, H) == Key ->
585
key_delete_all(Key, Pos, T, Ack);
586
key_delete_all(Key, Pos, [H|T], Ack) ->
587
key_delete_all(Key, Pos, T, [H|Ack]);
588
key_delete_all(_, _, [], Ack) ->
592
%% ********************* end server code ********************
593
%% The following code executes at the client side of a transactions
595
mnesia_down(N, Pending) ->
596
case whereis(?MODULE) of
598
%% Takes care of mnesia_down's in early startup
599
mnesia_monitor:mnesia_down(?MODULE, N);
601
%% Syncronously call needed in order to avoid
602
%% race with mnesia_tm's coordinator processes
603
%% that may restart and acquire new locks.
604
%% mnesia_monitor ensures the sync.
605
Pid ! {release_remote_non_pending, N, Pending}
608
%% Aquire a write lock, but do a read, used by
611
rwlock(Tid, Store, Oid) ->
613
case val({Tab, where_to_read}) of
615
mnesia:abort({no_exists, Tab});
618
case need_lock(Store, Tab, Key, Lock) of
621
Res = get_rwlocks_on_nodes(Ns, Ns, Node, Store, Tid, Oid),
622
?ets_insert(Store, {{locks, Tab, Key}, Lock}),
631
dirty_rpc(Node, Tab, Key, Lock)
636
get_rwlocks_on_nodes([Node | Tail], Orig, Node, Store, Tid, Oid) ->
637
Op = {self(), {read_write, Tid, Oid}},
638
{?MODULE, Node} ! Op,
639
?ets_insert(Store, {nodes, Node}),
641
get_rwlocks_on_nodes(Tail, Orig, Node, Store, Tid, Oid);
642
get_rwlocks_on_nodes([Node | Tail], Orig, OtherNode, Store, Tid, Oid) ->
643
Op = {self(), {write, Tid, Oid}},
644
{?MODULE, Node} ! Op,
646
?ets_insert(Store, {nodes, Node}),
647
get_rwlocks_on_nodes(Tail, Orig, OtherNode, Store, Tid, Oid);
648
get_rwlocks_on_nodes([], Orig, _Node, Store, _Tid, Oid) ->
649
receive_wlocks(Orig, read_write_lock, Store, Oid).
651
%% Return a list of nodes or abort transaction
652
%% WE also insert any additional where_to_write nodes
653
%% in the local store under the key == nodes
656
Nodes = ?catch_val({Tab, where_to_write}),
659
_ -> mnesia:abort({no_exists, Tab})
662
%% aquire a sticky wlock, a sticky lock is a lock
663
%% which remains at this node after the termination of the
666
sticky_wlock(Tid, Store, Oid) ->
667
sticky_lock(Tid, Store, Oid, write).
669
sticky_rwlock(Tid, Store, Oid) ->
670
sticky_lock(Tid, Store, Oid, read_write).
672
sticky_lock(Tid, Store, {Tab, Key} = Oid, Lock) ->
673
N = val({Tab, where_to_read}),
676
case need_lock(Store, Tab, Key, write) of
678
do_sticky_lock(Tid, Store, Oid, Lock);
680
dirty_sticky_lock(Tab, Key, [N], Lock)
683
mnesia:abort({not_local, Tab})
686
do_sticky_lock(Tid, Store, {Tab, Key} = Oid, Lock) ->
687
?MODULE ! {self(), {test_set_sticky, Tid, Oid, Lock}},
689
{?MODULE, _N, granted} ->
690
?ets_insert(Store, {{locks, Tab, Key}, write}),
692
{?MODULE, _N, {granted, Val}} -> %% for rwlocks
693
case opt_lookup_in_client(Val, Oid, write) of
694
C when record(C, cyclic) ->
697
?ets_insert(Store, {{locks, Tab, Key}, write}),
700
{?MODULE, _N, {not_granted, Reason}} ->
701
exit({aborted, Reason});
702
{?MODULE, N, not_stuck} ->
703
not_stuck(Tid, Store, Tab, Key, Oid, Lock, N),
704
dirty_sticky_lock(Tab, Key, [N], Lock);
706
exit({aborted, {node_not_running, N}});
707
{?MODULE, N, {stuck_elsewhere, _N2}} ->
708
stuck_elsewhere(Tid, Store, Tab, Key, Oid, Lock),
709
dirty_sticky_lock(Tab, Key, [N], Lock)
712
not_stuck(Tid, Store, Tab, _Key, Oid, _Lock, N) ->
713
rlock(Tid, Store, {Tab, ?ALL}), %% needed?
714
wlock(Tid, Store, Oid), %% perfect sync
715
wlock(Tid, Store, {Tab, ?STICK}), %% max one sticker/table
716
Ns = val({Tab, where_to_write}),
717
rpc:abcast(Ns, ?MODULE, {stick, Oid, N}).
719
stuck_elsewhere(Tid, Store, Tab, _Key, Oid, _Lock) ->
720
rlock(Tid, Store, {Tab, ?ALL}), %% needed?
721
wlock(Tid, Store, Oid), %% perfect sync
722
wlock(Tid, Store, {Tab, ?STICK}), %% max one sticker/table
723
Ns = val({Tab, where_to_write}),
724
rpc:abcast(Ns, ?MODULE, {unstick, Tab}).
726
dirty_sticky_lock(Tab, Key, Nodes, Lock) ->
728
Lock == read_write ->
729
mnesia_lib:db_get(Tab, Key);
738
sticky_wlock_table(Tid, Store, Tab) ->
739
sticky_lock(Tid, Store, {Tab, ?ALL}, write).
741
%% aquire a wlock on Oid
742
%% We store a {Tabname, write, Tid} in all locktables
743
%% on all nodes containing a copy of Tabname
744
%% We also store an item {{locks, Tab, Key}, write} in the
745
%% local store when we have aquired the lock.
747
wlock(Tid, Store, Oid) ->
749
case need_lock(Store, Tab, Key, write) of
752
Op = {self(), {write, Tid, Oid}},
753
?ets_insert(Store, {{locks, Tab, Key}, write}),
754
get_wlocks_on_nodes(Ns, Ns, Store, Op, Oid);
755
no when Key /= ?ALL, Tab /= ?GLOBAL ->
761
wlock_table(Tid, Store, Tab) ->
762
wlock(Tid, Store, {Tab, ?ALL}).
764
%% Write lock even if the table does not exist
766
wlock_no_exist(Tid, Store, Tab, Ns) ->
768
Op = {self(), {write, Tid, Oid}},
769
get_wlocks_on_nodes(Ns, Ns, Store, Op, Oid).
771
need_lock(Store, Tab, Key, LockPattern) ->
772
TabL = ?ets_match_object(Store, {{locks, Tab, ?ALL}, LockPattern}),
775
KeyL = ?ets_match_object(Store, {{locks, Tab, Key}, LockPattern}),
786
add_debug(Node) -> % Use process dictionary for debug info
787
case get(mnesia_wlock_nodes) of
789
put(mnesia_wlock_nodes, [Node]);
791
put(mnesia_wlock_nodes, [Node|NodeList])
795
case get(mnesia_wlock_nodes) of
796
undefined -> % Shouldn't happen
799
erase(mnesia_wlock_nodes);
801
put(mnesia_wlock_nodes, lists:delete(Node, List))
804
%% We first send lock requests to the lockmanagers on all
805
%% nodes holding a copy of the table
807
get_wlocks_on_nodes([Node | Tail], Orig, Store, Request, Oid) ->
808
{?MODULE, Node} ! Request,
809
?ets_insert(Store, {nodes, Node}),
811
get_wlocks_on_nodes(Tail, Orig, Store, Request, Oid);
812
get_wlocks_on_nodes([], Orig, Store, _Request, Oid) ->
813
receive_wlocks(Orig, Orig, Store, Oid).
815
receive_wlocks([Node | Tail], Res, Store, Oid) ->
817
{?MODULE, Node, granted} ->
819
receive_wlocks(Tail, Res, Store, Oid);
820
{?MODULE, Node, {granted, Val}} -> %% for rwlocks
822
case opt_lookup_in_client(Val, Oid, write) of
823
C when record(C, cyclic) ->
824
flush_remaining(Tail, Node, {aborted, C});
826
receive_wlocks(Tail, Val2, Store, Oid)
828
{?MODULE, Node, {not_granted, Reason}} ->
830
Reason1 = {aborted, Reason},
831
flush_remaining(Tail, Node, Reason1);
832
{mnesia_down, Node} ->
834
Reason1 = {aborted, {node_not_running, Node}},
835
flush_remaining(Tail, Node, Reason1);
836
{?MODULE, Node, {switch, Node2, Req}} -> %% for rwlocks
839
?ets_insert(Store, {nodes, Node2}),
840
{?MODULE, Node2} ! Req,
841
receive_wlocks([Node2 | Tail], Res, Store, Oid)
844
receive_wlocks([], Res, _Store, _Oid) ->
847
flush_remaining([], _SkipNode, Res) ->
849
flush_remaining([SkipNode | Tail ], SkipNode, Res) ->
851
flush_remaining(Tail, SkipNode, Res);
852
flush_remaining([Node | Tail], SkipNode, Res) ->
854
{?MODULE, Node, _} ->
856
flush_remaining(Tail, SkipNode, Res);
857
{mnesia_down, Node} ->
859
flush_remaining(Tail, SkipNode, {aborted, {node_not_running, Node}})
862
opt_lookup_in_client(lookup_in_client, Oid, Lock) ->
864
case catch mnesia_lib:db_get(Tab, Key) of
866
%% Table has been deleted from this node,
867
%% restart the transaction.
868
#cyclic{op = read, lock = Lock, oid = Oid, lucky = nowhere};
872
opt_lookup_in_client(Val, _Oid, _Lock) ->
875
return_granted_or_nodes({_, ?ALL} , Nodes) -> Nodes;
876
return_granted_or_nodes({?GLOBAL, _}, Nodes) -> Nodes;
877
return_granted_or_nodes(_ , _Nodes) -> granted.
879
%% We store a {Tab, read, From} item in the
880
%% locks table on the node where we actually do pick up the object
881
%% and we also store an item {lock, Oid, read} in our local store
882
%% so that we can release any locks we hold when we commit.
883
%% This function not only aquires a read lock, but also reads the object
885
%% Oid's are always {Tab, Key} tuples
886
rlock(Tid, Store, Oid) ->
888
case val({Tab, where_to_read}) of
890
mnesia:abort({no_exists, Tab});
892
case need_lock(Store, Tab, Key, '_') of
894
R = l_request(Node, {read, Tid, Oid}, Store),
895
rlock_get_reply(Node, Store, Oid, R);
903
dirty_rpc(Node, Tab, Key, read)
908
dirty_rpc(nowhere, Tab, Key, _Lock) ->
909
mnesia:abort({no_exists, {Tab, Key}});
910
dirty_rpc(Node, _Tab, ?ALL, _Lock) ->
912
dirty_rpc(Node, ?GLOBAL, _Key, _Lock) ->
914
dirty_rpc(Node, Tab, Key, Lock) ->
916
case rpc:call(Node, mnesia_lib, db_get, Args) of
918
case val({Tab, where_to_read}) of
920
ErrorTag = mnesia_lib:dirty_rpc_error_tag(Reason),
921
mnesia:abort({ErrorTag, Args});
923
%% Table has been deleted from the node,
924
%% restart the transaction.
925
C = #cyclic{op = read, lock = Lock, oid = {Tab, Key}, lucky = nowhere},
932
rlock_get_reply(Node, Store, Oid, {granted, V}) ->
934
?ets_insert(Store, {{locks, Tab, Key}, read}),
935
?ets_insert(Store, {nodes, Node}),
936
case opt_lookup_in_client(V, Oid, read) of
937
C when record(C, cyclic) ->
942
rlock_get_reply(Node, Store, Oid, granted) ->
944
?ets_insert(Store, {{locks, Tab, Key}, read}),
945
?ets_insert(Store, {nodes, Node}),
946
return_granted_or_nodes(Oid, [Node]);
947
rlock_get_reply(Node, Store, Tab, {granted, V, RealKeys}) ->
948
L = fun(K) -> ?ets_insert(Store, {{locks, Tab, K}, read}) end,
949
lists:foreach(L, RealKeys),
950
?ets_insert(Store, {nodes, Node}),
952
rlock_get_reply(_Node, _Store, _Oid, {not_granted , Reason}) ->
953
exit({aborted, Reason});
955
rlock_get_reply(_Node, Store, Oid, {switch, N2, Req}) ->
956
?ets_insert(Store, {nodes, N2}),
958
rlock_get_reply(N2, Store, Oid, l_req_rec(N2, Store)).
961
rlock_table(Tid, Store, Tab) ->
962
rlock(Tid, Store, {Tab, ?ALL}).
964
ixrlock(Tid, Store, Tab, IxKey, Pos) ->
965
case val({Tab, where_to_read}) of
967
mnesia:abort({no_exists, Tab});
969
R = l_request(Node, {ix_read, Tid, Tab, IxKey, Pos}, Store),
970
rlock_get_reply(Node, Store, Tab, R)
973
%% Grabs the locks or exits
974
global_lock(Tid, Store, Item, write, Ns) ->
975
Oid = {?GLOBAL, Item},
976
Op = {self(), {write, Tid, Oid}},
977
get_wlocks_on_nodes(Ns, Ns, Store, Op, Oid);
978
global_lock(Tid, Store, Item, read, Ns) ->
979
Oid = {?GLOBAL, Item},
980
send_requests(Ns, {read, Tid, Oid}),
981
rec_requests(Ns, Oid, Store),
984
send_requests([Node | Nodes], X) ->
985
{?MODULE, Node} ! {self(), X},
986
send_requests(Nodes, X);
987
send_requests([], _X) ->
990
rec_requests([Node | Nodes], Oid, Store) ->
991
Res = l_req_rec(Node, Store),
992
case catch rlock_get_reply(Node, Store, Oid, Res) of
994
flush_remaining(Nodes, Node, Reason);
996
rec_requests(Nodes, Oid, Store)
998
rec_requests([], _Oid, _Store) ->
1002
?ets_match_object(mnesia_held_locks, '_').
1005
Q = ?ets_match_object(mnesia_lock_queue, '_'),
1006
[{Oid, Op, Pid, Tid, WFT} || {queue, Oid, Tid, Op, Pid, WFT} <- Q].
1011
%%%%%%%%%%%%%%%%%%%%%%%%%%%
1014
system_continue(_Parent, _Debug, State) ->
1017
system_terminate(_Reason, _Parent, _Debug, _State) ->
1020
system_code_change(State, _Module, _OldVsn, _Extra) ->