197
195
declare(Recover, From, State = #q{q = Q,
198
196
backing_queue = BQ,
199
197
backing_queue_state = undefined}) ->
200
case rabbit_amqqueue:internal_declare(Q, Recover) of
201
not_found -> {stop, normal, not_found, State};
202
Q -> gen_server2:reply(From, {new, Q}),
203
ok = file_handle_cache:register_callback(
204
rabbit_amqqueue, set_maximum_since_use,
206
ok = rabbit_memory_monitor:register(
207
self(), {rabbit_amqqueue,
208
set_ram_duration_target, [self()]}),
209
BQS = bq_init(BQ, Q, Recover),
210
State1 = process_args(State#q{backing_queue_state = BQS}),
211
rabbit_event:notify(queue_created,
212
infos(?CREATION_EVENT_KEYS, State1)),
213
rabbit_event:if_enabled(State1, #q.stats_timer,
214
fun() -> emit_stats(State1) end),
216
Q1 -> {stop, normal, {existing, Q1}, State}
198
case rabbit_amqqueue:internal_declare(Q, Recover =/= new) of
200
case matches(Recover, Q, Q1) of
202
gen_server2:reply(From, {new, Q}),
203
ok = file_handle_cache:register_callback(
204
rabbit_amqqueue, set_maximum_since_use, [self()]),
205
ok = rabbit_memory_monitor:register(
206
self(), {rabbit_amqqueue,
207
set_ram_duration_target, [self()]}),
208
BQS = bq_init(BQ, Q, Recover),
209
recovery_barrier(Recover),
210
State1 = process_args(State#q{backing_queue_state = BQS}),
211
rabbit_event:notify(queue_created,
212
infos(?CREATION_EVENT_KEYS, State1)),
213
rabbit_event:if_enabled(State1, #q.stats_timer,
214
fun() -> emit_stats(State1) end),
217
{stop, normal, {existing, Q1}, State}
220
{stop, normal, Err, State}
223
matches(new, Q1, Q2) ->
225
Q1#amqqueue.name =:= Q2#amqqueue.name andalso
226
Q1#amqqueue.durable =:= Q2#amqqueue.durable andalso
227
Q1#amqqueue.auto_delete =:= Q2#amqqueue.auto_delete andalso
228
Q1#amqqueue.exclusive_owner =:= Q2#amqqueue.exclusive_owner andalso
229
Q1#amqqueue.arguments =:= Q2#amqqueue.arguments andalso
230
Q1#amqqueue.pid =:= Q2#amqqueue.pid andalso
231
Q1#amqqueue.slave_pids =:= Q2#amqqueue.slave_pids;
232
matches(_, Q, Q) -> true;
233
matches(_, _Q, _Q1) -> false.
219
235
bq_init(BQ, Q, Recover) ->
237
BQ:init(Q, Recover =/= new,
222
238
fun (Mod, Fun) ->
223
239
rabbit_amqqueue:run_backing_queue(Self, Mod, Fun)
242
recovery_barrier(new) ->
244
recovery_barrier(BarrierPid) ->
245
MRef = erlang:monitor(process, BarrierPid),
247
{BarrierPid, go} -> erlang:demonitor(MRef, [flush]);
248
{'DOWN', MRef, process, _, _} -> ok
226
251
process_args(State = #q{q = #amqqueue{arguments = Arguments}}) ->
228
253
fun({Arg, Fun}, State1) ->
482
507
rabbit_misc:gb_trees_foreach(fun rabbit_misc:confirm_to_sender/2, CMs),
483
508
State#q{msg_id_to_channel = MTC1}.
485
should_confirm_message(#delivery{msg_seq_no = undefined}, _State) ->
487
should_confirm_message(#delivery{sender = SenderPid,
510
send_or_record_confirm(#delivery{msg_seq_no = undefined}, State) ->
512
send_or_record_confirm(#delivery{sender = SenderPid,
488
513
msg_seq_no = MsgSeqNo,
489
514
message = #basic_message {
490
515
is_persistent = true,
492
#q{q = #amqqueue{durable = true}}) ->
493
{eventually, SenderPid, MsgSeqNo, MsgId};
494
should_confirm_message(#delivery{sender = SenderPid,
495
msg_seq_no = MsgSeqNo},
497
{immediately, SenderPid, MsgSeqNo}.
499
needs_confirming({eventually, _, _, _}) -> true;
500
needs_confirming(_) -> false.
502
maybe_record_confirm_message({eventually, SenderPid, MsgSeqNo, MsgId},
503
State = #q{msg_id_to_channel = MTC}) ->
504
State#q{msg_id_to_channel =
505
gb_trees:insert(MsgId, {SenderPid, MsgSeqNo}, MTC)};
506
maybe_record_confirm_message({immediately, SenderPid, MsgSeqNo}, State) ->
517
State = #q{q = #amqqueue{durable = true},
518
msg_id_to_channel = MTC}) ->
519
MTC1 = gb_trees:insert(MsgId, {SenderPid, MsgSeqNo}, MTC),
520
{eventually, State#q{msg_id_to_channel = MTC1}};
521
send_or_record_confirm(#delivery{sender = SenderPid,
522
msg_seq_no = MsgSeqNo}, State) ->
507
523
rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]),
509
maybe_record_confirm_message(_Confirm, State) ->
524
{immediately, State}.
526
discard(#delivery{sender = SenderPid, message = #basic_message{id = MsgId}},
528
%% fake an 'eventual' confirm from BQ; noop if not needed
529
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
530
confirm_messages([MsgId], State),
531
BQS1 = BQ:discard(MsgId, SenderPid, BQS),
532
State1#q{backing_queue_state = BQS1}.
512
534
run_message_queue(State) ->
513
535
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
517
539
BQ:is_empty(BQS), State1),
520
attempt_delivery(#delivery{sender = SenderPid, message = Message}, Confirm,
542
attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
543
Props = #message_properties{delivered = Delivered},
521
544
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
522
545
case BQ:is_duplicate(Message, BQS) of
524
547
deliver_msgs_to_consumers(
525
fun (AckRequired, State1 = #q{backing_queue_state = BQS2}) ->
526
Props = message_properties(Confirm, State1),
548
fun (true, State1 = #q{backing_queue_state = BQS2}) ->
527
549
{AckTag, BQS3} = BQ:publish_delivered(
528
AckRequired, Message, Props,
530
{{Message, false, AckTag}, true,
531
State1#q{backing_queue_state = BQS3}}
550
Message, Props, SenderPid, BQS2),
551
{{Message, Delivered, AckTag},
552
true, State1#q{backing_queue_state = BQS3}};
554
{{Message, Delivered, undefined},
555
true, discard(Delivery, State1)}
532
556
end, false, State#q{backing_queue_state = BQS1});
534
%% if the message has previously been seen by the BQ then
535
%% it must have been seen under the same circumstances as
536
%% now: i.e. if it is now a deliver_immediately then it
537
%% must have been before.
542
State#q{backing_queue_state = BQS1}}
558
{true, State#q{backing_queue_state = BQS1}};
560
{false, State#q{backing_queue_state = BQS1}}
545
deliver_or_enqueue(Delivery = #delivery{message = Message,
546
sender = SenderPid}, State) ->
547
Confirm = should_confirm_message(Delivery, State),
548
case attempt_delivery(Delivery, Confirm, State) of
550
maybe_record_confirm_message(Confirm, State1);
551
%% the next one is an optimisations
552
%% TODO: optimise the Confirm =/= never case too
553
{false, State1 = #q{ttl = 0, dlx = undefined}} when Confirm == never ->
554
discard_delivery(Delivery, State1);
556
State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
557
maybe_record_confirm_message(Confirm, State1),
558
Props = message_properties(Confirm, State2),
563
deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
565
{Confirm, State1} = send_or_record_confirm(Delivery, State),
566
Props = message_properties(Message, Confirm, Delivered, State),
567
case attempt_delivery(Delivery, Props, State1) of
570
%% The next one is an optimisation
571
{false, State2 = #q{ttl = 0, dlx = undefined}} ->
572
discard(Delivery, State2);
573
{false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} ->
559
574
BQS1 = BQ:publish(Message, Props, SenderPid, BQS),
560
575
ensure_ttl_timer(Props#message_properties.expiry,
561
576
State2#q{backing_queue_state = BQS1})
564
requeue_and_run(AckTags, State = #q{backing_queue = BQ}) ->
565
run_backing_queue(BQ, fun (M, BQS) ->
566
{_MsgIds, BQS1} = M:requeue(AckTags, BQS),
579
requeue_and_run(AckTags, State = #q{backing_queue = BQ,
580
backing_queue_state = BQS}) ->
581
{_MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
582
run_message_queue(State#q{backing_queue_state = BQS1}).
570
fetch(AckRequired, State = #q{backing_queue_state = BQS,
571
backing_queue = BQ}) ->
584
fetch(AckRequired, State = #q{backing_queue = BQ,
585
backing_queue_state = BQS}) ->
572
586
{Result, BQS1} = BQ:fetch(AckRequired, BQS),
573
587
{Result, State#q{backing_queue_state = BQS1}}.
589
ack(AckTags, ChPid, State) ->
590
subtract_acks(ChPid, AckTags, State,
591
fun (State1 = #q{backing_queue = BQ,
592
backing_queue_state = BQS}) ->
593
{_Guids, BQS1} = BQ:ack(AckTags, BQS),
594
State1#q{backing_queue_state = BQS1}
597
requeue(AckTags, ChPid, State) ->
598
subtract_acks(ChPid, AckTags, State,
599
fun (State1) -> requeue_and_run(AckTags, State1) end).
575
601
remove_consumer(ChPid, ConsumerTag, Queue) ->
576
602
queue:filter(fun ({CP, #consumer{tag = CTag}}) ->
577
603
(CP /= ChPid) or (CTag /= ConsumerTag)
682
discard_delivery(#delivery{sender = SenderPid,
684
State = #q{backing_queue = BQ,
685
backing_queue_state = BQS}) ->
686
State#q{backing_queue_state = BQ:discard(Message, SenderPid, BQS)}.
688
message_properties(Confirm, #q{ttl = TTL}) ->
689
#message_properties{expiry = calculate_msg_expiry(TTL),
690
needs_confirming = needs_confirming(Confirm)}.
692
calculate_msg_expiry(undefined) -> undefined;
693
calculate_msg_expiry(TTL) -> now_micros() + (TTL * 1000).
695
drop_expired_messages(State = #q{ttl = undefined}) ->
697
drop_expired_messages(State = #q{backing_queue_state = BQS,
705
message_properties(Message, Confirm, Delivered, #q{ttl = TTL}) ->
706
#message_properties{expiry = calculate_msg_expiry(Message, TTL),
707
needs_confirming = Confirm == eventually,
708
delivered = Delivered}.
710
calculate_msg_expiry(#basic_message{content = Content}, TTL) ->
711
#content{properties = Props} =
712
rabbit_binary_parser:ensure_content_decoded(Content),
713
%% We assert that the expiration must be valid - we check in the channel.
714
{ok, MsgTTL} = rabbit_basic:parse_expiration(Props),
715
case lists:min([TTL, MsgTTL]) of
716
undefined -> undefined;
717
T -> now_micros() + T * 1000
720
drop_expired_messages(State = #q{dlx = DLX,
721
backing_queue_state = BQS,
698
722
backing_queue = BQ }) ->
699
723
Now = now_micros(),
700
DLXFun = dead_letter_fun(expired, State),
701
724
ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end,
705
{Next, undefined, BQS2} = BQ:dropwhile(ExpirePred, false, BQS),
708
{Next, Msgs, BQS2} = BQ:dropwhile(ExpirePred, true, BQS),
709
lists:foreach(fun({Msg, AckTag}) -> DLXFun(Msg, AckTag) end,
725
{Props, BQS1} = case DLX of
726
undefined -> {Next, undefined, BQS2} =
727
BQ:dropwhile(ExpirePred, false, BQS),
729
_ -> {Next, Msgs, BQS2} =
730
BQ:dropwhile(ExpirePred, true, BQS),
731
DLXFun = dead_letter_fun(expired),
713
735
ensure_ttl_timer(case Props of
714
736
undefined -> undefined;
715
737
#message_properties{expiry = Exp} -> Exp
736
756
ensure_ttl_timer(_Expiry, State) ->
739
ack_if_no_dlx(AckTags, State = #q{dlx = undefined,
741
backing_queue_state = BQS }) ->
742
{_Guids, BQS1} = BQ:ack(AckTags, BQS),
743
State#q{backing_queue_state = BQS1};
744
ack_if_no_dlx(_AckTags, State) ->
747
dead_letter_fun(_Reason, #q{dlx = undefined}) ->
749
dead_letter_fun(Reason, _State) ->
751
gen_server2:cast(self(), {dead_letter, {Msg, AckTag}, Reason})
754
dead_letter_publish(Msg, Reason, State = #q{publish_seqno = MsgSeqNo}) ->
755
DLMsg = #basic_message{exchange_name = XName} =
756
make_dead_letter_msg(Reason, Msg, State),
757
case rabbit_exchange:lookup(XName) of
759
Delivery = rabbit_basic:delivery(false, false, DLMsg, MsgSeqNo),
760
{Queues, Cycles} = detect_dead_letter_cycles(
761
DLMsg, rabbit_exchange:route(X, Delivery)),
762
lists:foreach(fun log_cycle_once/1, Cycles),
763
QPids = rabbit_amqqueue:lookup(Queues),
764
{_, DeliveredQPids} = rabbit_amqqueue:deliver(QPids, Delivery),
766
{error, not_found} ->
770
dead_letter_msg(Msg, AckTag, Reason, State = #q{publish_seqno = MsgSeqNo,
771
unconfirmed = UC}) ->
772
QPids = dead_letter_publish(Msg, Reason, State),
773
State1 = State#q{queue_monitors = pmon:monitor_all(
774
QPids, State#q.queue_monitors),
775
publish_seqno = MsgSeqNo + 1},
777
[] -> cleanup_after_confirm([AckTag], State1);
778
_ -> UC1 = dtree:insert(MsgSeqNo, QPids, AckTag, UC),
779
noreply(State1#q{unconfirmed = UC1})
759
dead_letter_fun(Reason) ->
760
fun(Msgs) -> gen_server2:cast(self(), {dead_letter, Msgs, Reason}) end.
762
dead_letter_publish(Msg, Reason, X, State = #q{publish_seqno = MsgSeqNo}) ->
763
DLMsg = make_dead_letter_msg(Reason, Msg, State),
764
Delivery = rabbit_basic:delivery(false, DLMsg, MsgSeqNo),
765
{Queues, Cycles} = detect_dead_letter_cycles(
766
DLMsg, rabbit_exchange:route(X, Delivery)),
767
lists:foreach(fun log_cycle_once/1, Cycles),
768
{_, DeliveredQPids} = rabbit_amqqueue:deliver(
769
rabbit_amqqueue:lookup(Queues), Delivery),
782
772
handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons,
783
773
unconfirmed = UC}) ->
889
877
now_micros() -> timer:now_diff(now(), {0,0,0}).
891
infos(Items, State) ->
893
case lists:member(synchronised_slave_pids, Items) of
894
true -> Prefix1 = slaves_status(State),
895
case lists:member(slave_pids, Items) of
896
true -> {Prefix1, Items -- [slave_pids]};
897
false -> {proplists:delete(slave_pids, Prefix1), Items}
901
Prefix ++ [{Item, i(Item, State)}
902
|| Item <- (Items1 -- [synchronised_slave_pids])].
904
slaves_status(#q{q = #amqqueue{name = Name}}) ->
905
case rabbit_amqqueue:lookup(Name) of
906
{ok, #amqqueue{mirror_nodes = undefined}} ->
907
[{slave_pids, ''}, {synchronised_slave_pids, ''}];
908
{ok, #amqqueue{slave_pids = SPids}} ->
910
delegate:invoke(SPids, fun rabbit_mirror_queue_slave:info/1),
913
fun ({Pid, Infos}, {SPidsN, SSPidsN}) ->
915
case proplists:get_bool(is_synchronised, Infos) of
916
true -> [Pid | SSPidsN];
919
end, {[], []}, Results),
920
[{slave_pids, SPids1}, {synchronised_slave_pids, SSPids}]
879
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
923
881
i(name, #q{q = #amqqueue{name = Name}}) -> Name;
924
882
i(durable, #q{q = #amqqueue{durable = Durable}}) -> Durable;
1058
1033
handle_call(consumers, _From, State) ->
1059
1034
reply(consumers(State), State);
1061
handle_call({deliver, Delivery = #delivery{immediate = true}}, _From, State) ->
1062
%% FIXME: Is this correct semantics?
1064
%% I'm worried in particular about the case where an exchange has
1065
%% two queues against a particular routing key, and a message is
1066
%% sent in immediate mode through the binding. In non-immediate
1067
%% mode, both queues get the message, saving it for later if
1068
%% there's noone ready to receive it just now. In immediate mode,
1069
%% should both queues still get the message, somehow, or should
1070
%% just all ready-to-consume queues get the message, with unready
1071
%% queues discarding the message?
1073
Confirm = should_confirm_message(Delivery, State),
1074
{Delivered, State1} = attempt_delivery(Delivery, Confirm, State),
1075
reply(Delivered, case Delivered of
1076
true -> maybe_record_confirm_message(Confirm, State1);
1077
false -> discard_delivery(Delivery, State1)
1080
handle_call({deliver, Delivery = #delivery{mandatory = true}}, From, State) ->
1081
gen_server2:reply(From, true),
1082
noreply(deliver_or_enqueue(Delivery, State));
1036
handle_call({deliver, Delivery, Delivered}, From, State) ->
1037
%% Synchronous, "mandatory" deliver mode.
1038
gen_server2:reply(From, ok),
1039
noreply(deliver_or_enqueue(Delivery, Delivered, State));
1084
1041
handle_call({notify_down, ChPid}, From, State) ->
1085
1042
%% we want to do this synchronously, so that auto_deleted queues
1086
1043
%% are no longer visible by the time we send a response to the
1087
1044
%% client. The queue is ultimately deleted in terminate/2; if we
1088
1045
%% return stop with a reply, terminate/2 will be called by
1089
%% gen_server2 *before* the reply is sent.
1046
%% gen_server2 *before* the reply is sent. FIXME: in case of a
1047
%% delayed stop the reply is sent earlier.
1090
1048
case handle_ch_down(ChPid, State) of
1091
1049
{ok, State1} -> reply(ok, State1);
1092
{stop, State1} -> stop_later(normal, From, ok, State1)
1050
{stop, State1} -> stop(From, ok, State1)
1095
1053
handle_call({basic_get, ChPid, NoAck}, _From,
1192
1149
handle_call({requeue, AckTags, ChPid}, From, State) ->
1193
1150
gen_server2:reply(From, ok),
1194
noreply(subtract_acks(
1195
ChPid, AckTags, State,
1196
fun (State1) -> requeue_and_run(AckTags, State1) end));
1151
noreply(requeue(AckTags, ChPid, State));
1153
handle_call(start_mirroring, _From, State = #q{backing_queue = BQ,
1154
backing_queue_state = BQS}) ->
1155
%% lookup again to get policy for init_with_existing_bq
1156
{ok, Q} = rabbit_amqqueue:lookup(qname(State)),
1157
true = BQ =/= rabbit_mirror_queue_master, %% assertion
1158
BQ1 = rabbit_mirror_queue_master,
1159
BQS1 = BQ1:init_with_existing_bq(Q, BQ, BQS),
1160
reply(ok, State#q{backing_queue = BQ1,
1161
backing_queue_state = BQS1});
1163
handle_call(stop_mirroring, _From, State = #q{backing_queue = BQ,
1164
backing_queue_state = BQS}) ->
1165
BQ = rabbit_mirror_queue_master, %% assertion
1166
{BQ1, BQS1} = BQ:stop_mirroring(BQS),
1167
reply(ok, State#q{backing_queue = BQ1,
1168
backing_queue_state = BQS1});
1198
1170
handle_call(force_event_refresh, _From,
1199
1171
State = #q{exclusive_consumer = Exclusive}) ->
1219
1191
handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined ->
1220
1192
noreply(State);
1222
handle_cast({run_backing_queue, Mod, Fun}, State) ->
1223
noreply(run_backing_queue(Mod, Fun, State));
1194
handle_cast({run_backing_queue, Mod, Fun},
1195
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
1196
noreply(run_message_queue(
1197
State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}));
1225
handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow},
1199
handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow},
1226
1200
State = #q{senders = Senders}) ->
1227
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
1201
%% Asynchronous, non-"mandatory" deliver mode.
1228
1202
Senders1 = case Flow of
1229
1203
flow -> credit_flow:ack(Sender),
1230
1204
pmon:monitor(Sender, Senders);
1231
1205
noflow -> Senders
1233
1207
State1 = State#q{senders = Senders1},
1234
noreply(deliver_or_enqueue(Delivery, State1));
1208
noreply(deliver_or_enqueue(Delivery, Delivered, State1));
1236
1210
handle_cast({ack, AckTags, ChPid}, State) ->
1211
noreply(ack(AckTags, ChPid, State));
1213
handle_cast({reject, AckTags, true, ChPid}, State) ->
1214
noreply(requeue(AckTags, ChPid, State));
1216
handle_cast({reject, AckTags, false, ChPid}, State = #q{dlx = undefined}) ->
1217
noreply(ack(AckTags, ChPid, State));
1219
handle_cast({reject, AckTags, false, ChPid}, State) ->
1220
DLXFun = dead_letter_fun(rejected),
1237
1221
noreply(subtract_acks(
1238
1222
ChPid, AckTags, State,
1239
1223
fun (State1 = #q{backing_queue = BQ,
1240
1224
backing_queue_state = BQS}) ->
1241
{_Guids, BQS1} = BQ:ack(AckTags, BQS),
1225
BQS1 = BQ:fold(fun(M, A) -> DLXFun([{M, A}]) end,
1242
1227
State1#q{backing_queue_state = BQS1}
1245
handle_cast({reject, AckTags, Requeue, ChPid}, State) ->
1246
noreply(subtract_acks(
1247
ChPid, AckTags, State,
1249
true -> fun (State1) -> requeue_and_run(AckTags, State1) end;
1250
false -> fun (State1 = #q{backing_queue = BQ,
1251
backing_queue_state = BQS}) ->
1252
Fun = dead_letter_fun(rejected, State1),
1253
BQS1 = BQ:fold(Fun, BQS, AckTags),
1256
State1#q{backing_queue_state = BQS1})
1260
1230
handle_cast(delete_immediately, State) ->
1261
stop_later(normal, State);
1263
1233
handle_cast({unblock, ChPid}, State) ->
1301
1271
ok = file_handle_cache:set_maximum_since_use(Age),
1302
1272
noreply(State);
1304
handle_cast({dead_letter, {Msg, AckTag}, Reason}, State) ->
1305
dead_letter_msg(Msg, AckTag, Reason, State).
1274
handle_cast({dead_letter, Msgs, Reason}, State = #q{dlx = XName}) ->
1275
case rabbit_exchange:lookup(XName) of
1277
noreply(lists:foldl(
1278
fun({Msg, AckTag}, State1 = #q{publish_seqno = SeqNo,
1280
queue_monitors = QMon}) ->
1281
QPids = dead_letter_publish(Msg, Reason, X,
1283
UC1 = dtree:insert(SeqNo, QPids, AckTag, UC),
1284
QMons = pmon:monitor_all(QPids, QMon),
1285
State1#q{queue_monitors = QMons,
1286
publish_seqno = SeqNo + 1,
1289
{error, not_found} ->
1290
cleanup_after_confirm([AckTag || {_, AckTag} <- Msgs], State)
1293
handle_cast(wake_up, State) ->
1307
1296
%% We need to not ignore this as we need to remove outstanding
1308
1297
%% confirms due to queue death.