1
/* ``The contents of this file are subject to the Erlang Public License,
4
* Copyright Ericsson AB 1997-2009. All Rights Reserved.
6
* The contents of this file are subject to the Erlang Public License,
2
7
* Version 1.1, (the "License"); you may not use this file except in
3
8
* compliance with the License. You should have received a copy of the
4
9
* Erlang Public License along with this software. If not, it can be
5
* retrieved via the world wide web at http://www.erlang.org/.
10
* retrieved online at http://www.erlang.org/.
7
12
* Software distributed under the License is distributed on an "AS IS"
8
13
* basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
9
14
* the License for the specific language governing rights and limitations
10
15
* under the License.
12
* The Initial Developer of the Original Code is Ericsson Utvecklings AB.
13
* Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
14
* AB. All Rights Reserved.''
19
20
* Message passing primitives.
240
241
/* Add a message last in message queue */
242
243
erts_queue_message(Process* receiver,
243
ErtsProcLocks receiver_locks,
244
ErtsProcLocks *receiver_locks,
244
245
ErlHeapFragment* bp,
246
247
Eterm seq_trace_token)
250
#if !defined(ERTS_SMP)
251
ErtsProcLocks need_locks;
251
253
ASSERT(bp != NULL || receiver->mbuf == NULL);
254
ERTS_SMP_LC_ASSERT(receiver_locks == erts_proc_lc_my_proc_locks(receiver));
255
ERTS_SMP_LC_ASSERT((ERTS_PROC_LOCKS_MSG_SEND & receiver_locks)
256
== ERTS_PROC_LOCKS_MSG_SEND);
256
ERTS_SMP_LC_ASSERT(*receiver_locks == erts_proc_lc_my_proc_locks(receiver));
258
mp = message_alloc();
259
if (ERTS_PROC_PENDING_EXIT(receiver)) {
260
/* Drop message if receiver has a pending exit ... */
261
need_locks = ~(*receiver_locks) & (ERTS_PROC_LOCK_MSGQ
262
| ERTS_PROC_LOCK_STATUS);
264
*receiver_locks |= need_locks;
265
if (erts_smp_proc_trylock(receiver, need_locks) == EBUSY) {
266
if (need_locks == ERTS_PROC_LOCK_MSGQ) {
267
erts_smp_proc_unlock(receiver, ERTS_PROC_LOCK_STATUS);
268
need_locks = (ERTS_PROC_LOCK_MSGQ
269
| ERTS_PROC_LOCK_STATUS);
271
erts_smp_proc_lock(receiver, need_locks);
275
if (receiver->is_exiting || ERTS_PROC_PENDING_EXIT(receiver)) {
276
/* Drop message if receiver is exiting or has a pending
262
280
free_message_buffer(bp);
267
mp = message_alloc();
268
286
ERL_MESSAGE_TERM(mp) = message;
269
287
ERL_MESSAGE_TOKEN(mp) = seq_trace_token;
273
if (receiver_locks & ERTS_PROC_LOCK_MAIN) {
291
if (*receiver_locks & ERTS_PROC_LOCK_MAIN) {
296
314
ACTIVATE(receiver);
298
if (receiver->status == P_WAITING) {
299
add_to_schedule_q(receiver);
300
} else if (receiver->status == P_SUSPENDED) {
316
switch (receiver->status) {
318
switch (receiver->gcstatus) {
301
329
receiver->rstatus = P_RUNABLE;
333
erts_add_to_runq(receiver);
304
339
if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) {
629
665
BM_START_TIMER(send);
631
667
if (SEQ_TRACE_TOKEN(sender) != NIL && !(flags & ERTS_SND_FLG_NO_SEQ_TRACE)) {
635
670
BM_SWAP_TIMER(send,size);
636
msize = size_object(message);
671
msize = size_object(message);
637
672
BM_SWAP_TIMER(size,send);
639
674
seq_trace_update_send(sender);
692
727
ACTIVATE(receiver);
694
729
if (receiver->status == P_WAITING) {
695
add_to_schedule_q(receiver);
730
erts_add_to_runq(receiver);
696
731
} else if (receiver->status == P_SUSPENDED) {
697
732
receiver->rstatus = P_RUNABLE;
706
741
} else if (sender == receiver) {
707
742
/* Drop message if receiver has a pending exit ... */
708
if (!ERTS_PROC_PENDING_EXIT(receiver)) {
744
ErtsProcLocks need_locks = (~(*receiver_locks)
745
& (ERTS_PROC_LOCK_MSGQ
746
| ERTS_PROC_LOCK_STATUS));
748
*receiver_locks |= need_locks;
749
if (erts_smp_proc_trylock(receiver, need_locks) == EBUSY) {
750
if (need_locks == ERTS_PROC_LOCK_MSGQ) {
751
erts_smp_proc_unlock(receiver, ERTS_PROC_LOCK_STATUS);
752
need_locks = ERTS_PROC_LOCK_MSGQ|ERTS_PROC_LOCK_STATUS;
754
erts_smp_proc_lock(receiver, need_locks);
757
if (!ERTS_PROC_PENDING_EXIT(receiver))
709
760
ErlMessage* mp = message_alloc();
738
/* Drop message if receiver has a pending exit ... */
739
if (!ERTS_PROC_PENDING_EXIT(receiver)) {
740
BM_SWAP_TIMER(send,size);
741
msz = size_object(message);
742
BM_SWAP_TIMER(size,send);
743
hp = erts_alloc_message_heap(msz,&bp,&ohp,receiver,receiver_locks);
744
BM_SWAP_TIMER(send,copy);
745
message = copy_struct(message, msz, &hp, ohp);
746
BM_MESSAGE_COPIED(msz);
747
BM_SWAP_TIMER(copy,send);
748
erts_queue_message(receiver,
788
BM_SWAP_TIMER(send,size);
789
msize = size_object(message);
790
BM_SWAP_TIMER(size,send);
791
hp = erts_alloc_message_heap(msize,&bp,&ohp,receiver,receiver_locks);
792
BM_SWAP_TIMER(send,copy);
793
message = copy_struct(message, msize, &hp, ohp);
794
BM_MESSAGE_COPIED(msz);
795
BM_SWAP_TIMER(copy,send);
796
erts_queue_message(receiver, receiver_locks, bp, message, token);
752
797
BM_SWAP_TIMER(send,system);
754
799
ErlMessage* mp = message_alloc();
757
801
BM_SWAP_TIMER(send,size);
758
msize = size_object(message);
802
msize = size_object(message);
759
803
BM_SWAP_TIMER(size,send);
761
805
if (receiver->stop - receiver->htop <= msize) {
776
820
LINK_MESSAGE(receiver, mp);
778
822
if (receiver->status == P_WAITING) {
779
add_to_schedule_q(receiver);
823
erts_add_to_runq(receiver);
780
824
} else if (receiver->status == P_SUSPENDED) {
781
825
receiver->rstatus = P_RUNABLE;
824
868
/* the trace token must in this case be updated by the caller */
825
869
seq_trace_output(token, save, SEQ_TRACE_SEND, to->id, NULL);
826
870
temptoken = copy_struct(token, sz_token, &hp, &bp->off_heap);
827
erts_queue_message(to, *to_locksp, bp, save, temptoken);
871
erts_queue_message(to, to_locksp, bp, save, temptoken);
830
874
sz_reason = size_object(reason);
842
886
: copy_struct(from, sz_from, &hp, ohp));
843
887
save = TUPLE3(hp, am_EXIT, from_copy, mess);
844
erts_queue_message(to, *to_locksp, bp, save, NIL);
888
erts_queue_message(to, to_locksp, bp, save, NIL);