~ubuntu-branches/ubuntu/saucy/rabbitmq-server/saucy-proposed

« back to all changes in this revision

Viewing changes to src/rabbit_backing_queue_qc.erl

  • Committer: Package Import Robot
  • Author(s): Emile Joubert
  • Date: 2013-05-02 11:19:31 UTC
  • mfrom: (0.5.2) (0.1.37 sid)
  • Revision ID: package-import@ubuntu.com-20130502111931-xnoj0sejto2tewcj
Tags: 3.1.0-1
New upstream release

Show diffs side-by-side

added added

removed removed

Lines of Context:
85
85
 
86
86
%% Commands
87
87
 
88
 
%% Command frequencies are tuned so that queues are normally reasonably
89
 
%% short, but they may sometimes exceed ?QUEUE_MAXLEN. Publish-multiple
90
 
%% and purging cause extreme queue lengths, so these have lower probabilities.
91
 
%% Fetches are sufficiently frequent so that commands that need acktags
92
 
%% get decent coverage.
 
88
%% Command frequencies are tuned so that queues are normally
 
89
%% reasonably short, but they may sometimes exceed
 
90
%% ?QUEUE_MAXLEN. Publish-multiple and purging cause extreme queue
 
91
%% lengths, so these have lower probabilities.  Fetches/drops are
 
92
%% sufficiently frequent so that commands that need acktags get decent
 
93
%% coverage.
93
94
 
94
95
command(S) ->
95
96
    frequency([{10, qc_publish(S)},
96
97
               {1,  qc_publish_delivered(S)},
97
98
               {1,  qc_publish_multiple(S)},  %% very slow
98
 
               {15, qc_fetch(S)},             %% needed for ack and requeue
 
99
               {9,  qc_fetch(S)},             %% needed for ack and requeue
 
100
               {6,  qc_drop(S)},              %%
99
101
               {15, qc_ack(S)},
100
102
               {15, qc_requeue(S)},
101
103
               {3,  qc_set_ram_duration_target(S)},
104
106
               {1,  qc_dropwhile(S)},
105
107
               {1,  qc_is_empty(S)},
106
108
               {1,  qc_timeout(S)},
107
 
               {1,  qc_purge(S)}]).
 
109
               {1,  qc_purge(S)},
 
110
               {1,  qc_fold(S)}]).
108
111
 
109
112
qc_publish(#state{bqstate = BQ}) ->
110
113
    {call, ?BQMOD, publish,
112
115
      #message_properties{needs_confirming = frequency([{1,  true},
113
116
                                                        {20, false}]),
114
117
                          expiry = oneof([undefined | lists:seq(1, 10)])},
115
 
      self(), BQ]}.
 
118
      false, self(), BQ]}.
116
119
 
117
120
qc_publish_multiple(#state{}) ->
118
121
    {call, ?MODULE, publish_multiple, [resize(?QUEUE_MAXLEN, pos_integer())]}.
124
127
qc_fetch(#state{bqstate = BQ}) ->
125
128
    {call, ?BQMOD, fetch, [boolean(), BQ]}.
126
129
 
 
130
qc_drop(#state{bqstate = BQ}) ->
 
131
    {call, ?BQMOD, drop, [boolean(), BQ]}.
 
132
 
127
133
qc_ack(#state{bqstate = BQ, acks = Acks}) ->
128
134
    {call, ?BQMOD, ack, [rand_choice(proplists:get_keys(Acks)), BQ]}.
129
135
 
141
147
    {call, ?BQMOD, drain_confirmed, [BQ]}.
142
148
 
143
149
qc_dropwhile(#state{bqstate = BQ}) ->
144
 
    {call, ?BQMOD, dropwhile, [fun dropfun/1, false, BQ]}.
 
150
    {call, ?BQMOD, dropwhile, [fun dropfun/1, BQ]}.
145
151
 
146
152
qc_is_empty(#state{bqstate = BQ}) ->
147
153
    {call, ?BQMOD, is_empty, [BQ]}.
152
158
qc_purge(#state{bqstate = BQ}) ->
153
159
    {call, ?BQMOD, purge, [BQ]}.
154
160
 
 
161
qc_fold(#state{bqstate = BQ}) ->
 
162
    {call, ?BQMOD, fold, [makefoldfun(pos_integer()), foldacc(), BQ]}.
 
163
 
155
164
%% Preconditions
156
165
 
157
166
%% Create long queues by only allowing publishing
173
182
 
174
183
%% Model updates
175
184
 
176
 
next_state(S, BQ, {call, ?BQMOD, publish, [Msg, MsgProps, _Pid, _BQ]}) ->
 
185
next_state(S, BQ, {call, ?BQMOD, publish, [Msg, MsgProps, _Del, _Pid, _BQ]}) ->
177
186
    #state{len         = Len,
178
187
           messages    = Messages,
179
188
           confirms    = Confirms,
217
226
           };
218
227
 
219
228
next_state(S, Res, {call, ?BQMOD, fetch, [AckReq, _BQ]}) ->
220
 
    #state{len = Len, messages = Messages, acks = Acks} = S,
221
 
    ResultInfo = {call, erlang, element, [1, Res]},
222
 
    BQ1        = {call, erlang, element, [2, Res]},
223
 
    AckTag     = {call, erlang, element, [3, ResultInfo]},
224
 
    S1         = S#state{bqstate = BQ1},
225
 
    case gb_trees:is_empty(Messages) of
226
 
        true  -> S1;
227
 
        false -> {SeqId, MsgProp_Msg, M2} = gb_trees:take_smallest(Messages),
228
 
                 S2 = S1#state{len = Len - 1, messages = M2},
229
 
                 case AckReq of
230
 
                     true  ->
231
 
                         S2#state{acks = [{AckTag, {SeqId, MsgProp_Msg}}|Acks]};
232
 
                     false ->
233
 
                         S2
234
 
                 end
235
 
    end;
 
229
    next_state_fetch_and_drop(S, Res, AckReq, 3);
 
230
 
 
231
next_state(S, Res, {call, ?BQMOD, drop, [AckReq, _BQ]}) ->
 
232
    next_state_fetch_and_drop(S, Res, AckReq, 2);
236
233
 
237
234
next_state(S, Res, {call, ?BQMOD, ack, [AcksArg, _BQ]}) ->
238
235
    #state{acks = AcksState} = S,
265
262
    S#state{bqstate = BQ1};
266
263
 
267
264
next_state(S, Res, {call, ?BQMOD, dropwhile, _Args}) ->
268
 
    BQ = {call, erlang, element, [3, Res]},
 
265
    BQ = {call, erlang, element, [2, Res]},
269
266
    #state{messages = Messages} = S,
270
267
    Msgs1 = drop_messages(Messages),
271
268
    S#state{bqstate = BQ, len = gb_trees:size(Msgs1), messages = Msgs1};
278
275
 
279
276
next_state(S, Res, {call, ?BQMOD, purge, _Args}) ->
280
277
    BQ1 = {call, erlang, element, [2, Res]},
281
 
    S#state{bqstate = BQ1, len = 0, messages = gb_trees:empty()}.
 
278
    S#state{bqstate = BQ1, len = 0, messages = gb_trees:empty()};
 
279
 
 
280
next_state(S, Res, {call, ?BQMOD, fold, _Args}) ->
 
281
    BQ1 = {call, erlang, element, [2, Res]},
 
282
    S#state{bqstate = BQ1}.
282
283
 
283
284
%% Postconditions
284
285
 
285
286
postcondition(S, {call, ?BQMOD, fetch, _Args}, Res) ->
286
287
    #state{messages = Messages, len = Len, acks = Acks, confirms = Confrms} = S,
287
288
    case Res of
288
 
        {{MsgFetched, _IsDelivered, AckTag, RemainingLen}, _BQ} ->
 
289
        {{MsgFetched, _IsDelivered, AckTag}, _BQ} ->
289
290
            {_SeqId, {_MsgProps, Msg}} = gb_trees:smallest(Messages),
290
291
            MsgFetched =:= Msg andalso
291
292
            not proplists:is_defined(AckTag, Acks) andalso
292
293
                not gb_sets:is_element(AckTag, Confrms) andalso
293
 
                RemainingLen =:= Len - 1;
 
294
                Len =/= 0;
 
295
        {empty, _BQ} ->
 
296
            Len =:= 0
 
297
    end;
 
298
 
 
299
postcondition(S, {call, ?BQMOD, drop, _Args}, Res) ->
 
300
    #state{messages = Messages, len = Len, acks = Acks, confirms = Confrms} = S,
 
301
    case Res of
 
302
        {{MsgIdFetched, AckTag}, _BQ} ->
 
303
            {_SeqId, {_MsgProps, Msg}} = gb_trees:smallest(Messages),
 
304
            MsgId = eval({call, erlang, element,
 
305
                          [?RECORD_INDEX(id, basic_message), Msg]}),
 
306
            MsgIdFetched =:= MsgId andalso
 
307
            not proplists:is_defined(AckTag, Acks) andalso
 
308
                not gb_sets:is_element(AckTag, Confrms) andalso
 
309
                Len =/= 0;
294
310
        {empty, _BQ} ->
295
311
            Len =:= 0
296
312
    end;
313
329
    lists:all(fun (M) -> gb_sets:is_element(M, Confirms) end,
314
330
              ReportedConfirmed);
315
331
 
 
332
postcondition(S, {call, ?BQMOD, fold, [FoldFun, Acc0, _BQ0]}, {Res, _BQ1}) ->
 
333
    #state{messages = Messages} = S,
 
334
    {_, Model} = lists:foldl(fun ({_SeqId, {_MsgProps, _Msg}}, {stop, Acc}) ->
 
335
                                     {stop, Acc};
 
336
                                 ({_SeqId, {MsgProps, Msg}}, {cont, Acc}) ->
 
337
                                     FoldFun(Msg, MsgProps, false, Acc)
 
338
                             end, {cont, Acc0}, gb_trees:to_list(Messages)),
 
339
    true = Model =:= Res;
 
340
 
316
341
postcondition(#state{bqstate = BQ, len = Len}, {call, _M, _F, _A}, _Res) ->
317
342
    ?BQMOD:len(BQ) =:= Len.
318
343
 
371
396
                       rand_choice(List -- [Picked], [Picked | Selection],
372
397
                       N - 1).
373
398
 
 
399
makefoldfun(Size) ->
 
400
    fun (Msg, _MsgProps, Unacked, Acc) ->
 
401
            case {length(Acc) > Size, Unacked} of
 
402
                {false, false} -> {cont, [Msg | Acc]};
 
403
                {false, true}  -> {cont, Acc};
 
404
                {true, _}      -> {stop, Acc}
 
405
            end
 
406
    end.
 
407
foldacc() -> [].
 
408
 
374
409
dropfun(Props) ->
375
410
    Expiry = eval({call, erlang, element,
376
411
                   [?RECORD_INDEX(expiry, message_properties), Props]}),
388
423
            end
389
424
    end.
390
425
 
 
426
next_state_fetch_and_drop(S, Res, AckReq, AckTagIdx) ->
 
427
    #state{len = Len, messages = Messages, acks = Acks} = S,
 
428
    ResultInfo = {call, erlang, element, [1, Res]},
 
429
    BQ1        = {call, erlang, element, [2, Res]},
 
430
    AckTag     = {call, erlang, element, [AckTagIdx, ResultInfo]},
 
431
    S1         = S#state{bqstate = BQ1},
 
432
    case gb_trees:is_empty(Messages) of
 
433
        true  -> S1;
 
434
        false -> {SeqId, MsgProp_Msg, M2} = gb_trees:take_smallest(Messages),
 
435
                 S2 = S1#state{len = Len - 1, messages = M2},
 
436
                 case AckReq of
 
437
                     true  ->
 
438
                         S2#state{acks = [{AckTag, {SeqId, MsgProp_Msg}}|Acks]};
 
439
                     false ->
 
440
                         S2
 
441
                 end
 
442
    end.
 
443
 
391
444
-else.
392
445
 
393
446
-export([prop_disabled/0]).