21
-export_type([async_callback/0]).
23
21
%% We can't specify a per-queue ack/state with callback signatures
24
22
-type(ack() :: any()).
25
23
-type(state() :: any()).
27
25
-type(msg_ids() :: [rabbit_types:msg_id()]).
28
26
-type(fetch_result(Ack) ::
30
%% Message, IsDelivered, AckTag, Remaining_Len
31
{rabbit_types:basic_message(), boolean(), Ack, non_neg_integer()})).
27
('empty' | {rabbit_types:basic_message(), boolean(), Ack})).
28
-type(drop_result(Ack) ::
29
('empty' | {rabbit_types:msg_id(), Ack})).
32
30
-type(attempt_recovery() :: boolean()).
33
31
-type(purged_msg_count() :: non_neg_integer()).
34
32
-type(async_callback() ::
35
33
fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')).
36
34
-type(duration() :: ('undefined' | 'infinity' | number())).
38
-type(msg_fun() :: fun((rabbit_types:basic_message(), ack()) -> 'ok') |
36
-type(msg_fun(A) :: fun ((rabbit_types:basic_message(), ack(), A) -> A)).
40
37
-type(msg_pred() :: fun ((rabbit_types:message_properties()) -> boolean())).
42
39
%% Called on startup with a list of durable queue names. The queues
73
70
-callback delete_and_terminate(any(), state()) -> state().
75
%% Remove all messages in the queue, but not messages which have been
76
%% fetched and are pending acks.
72
%% Remove all 'fetchable' messages from the queue, i.e. all messages
73
%% except those that have been fetched already and are pending acks.
77
74
-callback purge(state()) -> {purged_msg_count(), state()}.
76
%% Remove all messages in the queue which have been fetched and are
78
-callback purge_acks(state()) -> state().
79
80
%% Publish a message.
80
81
-callback publish(rabbit_types:basic_message(),
81
rabbit_types:message_properties(), pid(), state()) ->
82
rabbit_types:message_properties(), boolean(), pid(),
84
85
%% Called for messages which have already been passed straight
85
86
%% out to a client. The queue will be empty for these calls
125
126
-callback drain_confirmed(state()) -> {msg_ids(), state()}.
127
%% Drop messages from the head of the queue while the supplied predicate returns
128
%% true. Also accepts a boolean parameter that determines whether the messages
129
%% necessitate an ack or not. If they do, the function returns a list of
130
%% messages with the respective acktags.
131
-callback dropwhile(msg_pred(), true, state())
132
-> {rabbit_types:message_properties() | undefined,
133
[{rabbit_types:basic_message(), ack()}], state()};
134
(msg_pred(), false, state())
135
-> {rabbit_types:message_properties() | undefined,
128
%% Drop messages from the head of the queue while the supplied
129
%% predicate on message properties returns true. Returns the first
130
%% message properties for which the predictate returned false, or
131
%% 'undefined' if the whole backing queue was traversed w/o the
132
%% predicate ever returning false.
133
-callback dropwhile(msg_pred(), state())
134
-> {rabbit_types:message_properties() | undefined, state()}.
136
%% Like dropwhile, except messages are fetched in "require
137
%% acknowledgement" mode and are passed, together with their ack tag,
138
%% to the supplied function. The function is also fed an
139
%% accumulator. The result of fetchwhile is as for dropwhile plus the
141
-callback fetchwhile(msg_pred(), msg_fun(A), A, state())
142
-> {rabbit_types:message_properties() | undefined,
138
145
%% Produce the next message.
139
146
-callback fetch(true, state()) -> {fetch_result(ack()), state()};
140
147
(false, state()) -> {fetch_result(undefined), state()}.
149
%% Remove the next message.
150
-callback drop(true, state()) -> {drop_result(ack()), state()};
151
(false, state()) -> {drop_result(undefined), state()}.
142
153
%% Acktags supplied are for messages which can now be forgotten
143
154
%% about. Must return 1 msg_id per Ack, in the same order as Acks.
144
155
-callback ack([ack()], state()) -> {msg_ids(), state()}.
146
%% Acktags supplied are for messages which should be processed. The
147
%% provided callback function is called with each message.
148
-callback fold(msg_fun(), state(), [ack()]) -> state().
150
157
%% Reinsert messages into the queue which have already been delivered
151
158
%% and were pending acknowledgement.
152
159
-callback requeue([ack()], state()) -> {msg_ids(), state()}.
161
%% Fold over messages by ack tag. The supplied function is called with
162
%% each message, its ack tag, and an accumulator.
163
-callback ackfold(msg_fun(A), A, state(), [ack()]) -> {A, state()}.
165
%% Fold over all the messages in a queue and return the accumulated
166
%% results, leaving the queue undisturbed.
167
-callback fold(fun((rabbit_types:basic_message(),
168
rabbit_types:message_properties(),
169
boolean(), A) -> {('stop' | 'cont'), A}),
170
A, state()) -> {A, state()}.
154
172
%% How long is my queue?
155
173
-callback len(state()) -> non_neg_integer().
211
229
behaviour_info(callbacks) ->
212
230
[{start, 1}, {stop, 0}, {init, 3}, {terminate, 2},
213
{delete_and_terminate, 2}, {purge, 1}, {publish, 4},
214
{publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1}, {dropwhile, 3},
215
{fetch, 2}, {ack, 2}, {fold, 3}, {requeue, 2}, {len, 1},
231
{delete_and_terminate, 2}, {purge, 1}, {purge_acks, 1}, {publish, 5},
232
{publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1},
233
{dropwhile, 2}, {fetchwhile, 4},
234
{fetch, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1},
216
235
{is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2},
217
236
{ram_duration, 1}, {needs_timeout, 1}, {timeout, 1},
218
237
{handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2}] ;