~ubuntu-branches/ubuntu/saucy/rabbitmq-server/saucy-proposed

« back to all changes in this revision

Viewing changes to src/rabbit_backing_queue.erl

  • Committer: Package Import Robot
  • Author(s): Emile Joubert
  • Date: 2013-05-02 11:19:31 UTC
  • mfrom: (0.5.2) (0.1.37 sid)
  • Revision ID: package-import@ubuntu.com-20130502111931-xnoj0sejto2tewcj
Tags: 3.1.0-1
New upstream release

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
 
19
19
-ifdef(use_specs).
20
20
 
21
 
-export_type([async_callback/0]).
22
 
 
23
21
%% We can't specify a per-queue ack/state with callback signatures
24
22
-type(ack()   :: any()).
25
23
-type(state() :: any()).
26
24
 
27
25
-type(msg_ids() :: [rabbit_types:msg_id()]).
28
26
-type(fetch_result(Ack) ::
29
 
        ('empty' |
30
 
         %% Message,                  IsDelivered, AckTag, Remaining_Len
31
 
         {rabbit_types:basic_message(), boolean(), Ack, non_neg_integer()})).
 
27
        ('empty' | {rabbit_types:basic_message(), boolean(), Ack})).
 
28
-type(drop_result(Ack) ::
 
29
        ('empty' | {rabbit_types:msg_id(), Ack})).
32
30
-type(attempt_recovery() :: boolean()).
33
31
-type(purged_msg_count() :: non_neg_integer()).
34
32
-type(async_callback() ::
35
33
        fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')).
36
34
-type(duration() :: ('undefined' | 'infinity' | number())).
37
35
 
38
 
-type(msg_fun() :: fun((rabbit_types:basic_message(), ack()) -> 'ok') |
39
 
                   'undefined').
 
36
-type(msg_fun(A) :: fun ((rabbit_types:basic_message(), ack(), A) -> A)).
40
37
-type(msg_pred() :: fun ((rabbit_types:message_properties()) -> boolean())).
41
38
 
42
39
%% Called on startup with a list of durable queue names. The queues
72
69
%% content.
73
70
-callback delete_and_terminate(any(), state()) -> state().
74
71
 
75
 
%% Remove all messages in the queue, but not messages which have been
76
 
%% fetched and are pending acks.
 
72
%% Remove all 'fetchable' messages from the queue, i.e. all messages
 
73
%% except those that have been fetched already and are pending acks.
77
74
-callback purge(state()) -> {purged_msg_count(), state()}.
78
75
 
 
76
%% Remove all messages in the queue which have been fetched and are
 
77
%% pending acks.
 
78
-callback purge_acks(state()) -> state().
 
79
 
79
80
%% Publish a message.
80
81
-callback publish(rabbit_types:basic_message(),
81
 
                  rabbit_types:message_properties(), pid(), state()) ->
82
 
    state().
 
82
                  rabbit_types:message_properties(), boolean(), pid(),
 
83
                  state()) -> state().
83
84
 
84
85
%% Called for messages which have already been passed straight
85
86
%% out to a client. The queue will be empty for these calls
124
125
%% be ignored.
125
126
-callback drain_confirmed(state()) -> {msg_ids(), state()}.
126
127
 
127
 
%% Drop messages from the head of the queue while the supplied predicate returns
128
 
%% true. Also accepts a boolean parameter that determines whether the messages
129
 
%% necessitate an ack or not. If they do, the function returns a list of
130
 
%% messages with the respective acktags.
131
 
-callback dropwhile(msg_pred(), true, state())
132
 
                   -> {rabbit_types:message_properties() | undefined,
133
 
                       [{rabbit_types:basic_message(), ack()}], state()};
134
 
                   (msg_pred(), false, state())
135
 
                   -> {rabbit_types:message_properties() | undefined,
136
 
                       undefined, state()}.
 
128
%% Drop messages from the head of the queue while the supplied
 
129
%% predicate on message properties returns true. Returns the first
 
130
%% message properties for which the predictate returned false, or
 
131
%% 'undefined' if the whole backing queue was traversed w/o the
 
132
%% predicate ever returning false.
 
133
-callback dropwhile(msg_pred(), state())
 
134
                   -> {rabbit_types:message_properties() | undefined, state()}.
 
135
 
 
136
%% Like dropwhile, except messages are fetched in "require
 
137
%% acknowledgement" mode and are passed, together with their ack tag,
 
138
%% to the supplied function. The function is also fed an
 
139
%% accumulator. The result of fetchwhile is as for dropwhile plus the
 
140
%% accumulator.
 
141
-callback fetchwhile(msg_pred(), msg_fun(A), A, state())
 
142
                     -> {rabbit_types:message_properties() | undefined,
 
143
                         A, state()}.
137
144
 
138
145
%% Produce the next message.
139
146
-callback fetch(true,  state()) -> {fetch_result(ack()), state()};
140
147
               (false, state()) -> {fetch_result(undefined), state()}.
141
148
 
 
149
%% Remove the next message.
 
150
-callback drop(true,  state()) -> {drop_result(ack()), state()};
 
151
              (false, state()) -> {drop_result(undefined), state()}.
 
152
 
142
153
%% Acktags supplied are for messages which can now be forgotten
143
154
%% about. Must return 1 msg_id per Ack, in the same order as Acks.
144
155
-callback ack([ack()], state()) -> {msg_ids(), state()}.
145
156
 
146
 
%% Acktags supplied are for messages which should be processed. The
147
 
%% provided callback function is called with each message.
148
 
-callback fold(msg_fun(), state(), [ack()]) -> state().
149
 
 
150
157
%% Reinsert messages into the queue which have already been delivered
151
158
%% and were pending acknowledgement.
152
159
-callback requeue([ack()], state()) -> {msg_ids(), state()}.
153
160
 
 
161
%% Fold over messages by ack tag. The supplied function is called with
 
162
%% each message, its ack tag, and an accumulator.
 
163
-callback ackfold(msg_fun(A), A, state(), [ack()]) -> {A, state()}.
 
164
 
 
165
%% Fold over all the messages in a queue and return the accumulated
 
166
%% results, leaving the queue undisturbed.
 
167
-callback fold(fun((rabbit_types:basic_message(),
 
168
                    rabbit_types:message_properties(),
 
169
                    boolean(), A) -> {('stop' | 'cont'), A}),
 
170
               A, state()) -> {A, state()}.
 
171
 
154
172
%% How long is my queue?
155
173
-callback len(state()) -> non_neg_integer().
156
174
 
210
228
 
211
229
behaviour_info(callbacks) ->
212
230
    [{start, 1}, {stop, 0}, {init, 3}, {terminate, 2},
213
 
     {delete_and_terminate, 2}, {purge, 1}, {publish, 4},
214
 
     {publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1}, {dropwhile, 3},
215
 
     {fetch, 2}, {ack, 2}, {fold, 3}, {requeue, 2}, {len, 1},
 
231
     {delete_and_terminate, 2}, {purge, 1}, {purge_acks, 1}, {publish, 5},
 
232
     {publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1},
 
233
     {dropwhile, 2}, {fetchwhile, 4},
 
234
     {fetch, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1},
216
235
     {is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2},
217
236
     {ram_duration, 1}, {needs_timeout, 1}, {timeout, 1},
218
237
     {handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2}] ;