~ubuntu-branches/ubuntu/trusty/ejabberd/trusty-proposed

« back to all changes in this revision

Viewing changes to src/p1_fsm.erl

  • Committer: Bazaar Package Importer
  • Author(s): Gerfried Fuchs, Konstantin Khomoutov, Gerfried Fuchs
  • Date: 2009-12-04 18:22:49 UTC
  • mfrom: (1.1.11 upstream)
  • Revision ID: james.westby@ubuntu.com-20091204182249-6jfmdz8878h7oaos
Tags: 2.1.0-1
[ Konstantin Khomoutov ]
* New upstream release (Closes: #519858).
  This also adds support for LDAPS upstream (Closes: #526145).
* Do not depend on cdbs anymore, port debian/rules to dh+quilt,
  remove build dependency on patchutils, use erlang-depends.
* Bump debhelper version to 7, standards base to 3.8.3
* Depend on erlang R13B.
* Recommend imagemagick (for captcha support).
* Remove deprecated patches (ssl.patch patch, dynamic_compile_loglevel.patch,
  ldaps.patch, update.patch, proxy.patch, caps.patch, convert.patch,
  s2s.patch).
* Replace mod_ctlextra with mod_admin_extra.
* Use upstream inetrc file.
* Bring debian/ejabberd.cfg and ejabberdctl in sync with upstream.
* Update ejabberdctl manual page.
* Provide NEWS file.
* Rework README.Debian:
  * Group all information into sections.
  * Describe issues with epam binary (Closes: #502791).
  * Discuss how to use DBMS backends (Closes: #540915, #507144).
  * Discuss upgrading from 2.0.x series.
* Implement PID file management (Closes: #519858).
* Make logrotate process all files matching "*.log".
* Improve init script:
  * Make init script LSB-compliant.
  * Implement "live" target which allows to run ejabberd in foreground.
* Make captcha.sh use bash explicitly.
* Rework node-generation for ejabberdctl to fix ejabberd's atom table
  overflows while preserving the possibility to run several versions
  of ejabberdctl concurrently as before.
* Add webadmin patch restoring compatibility with Erlang/OTP <= R12B-4.
* Integrate upstream patch for EJAB-1106.
* Add upstream patch for EJAB-1098.
* Add upstream patch for EJAB-1045.
* Add Konstantin Khomoutov to uploaders.
* Add Japanese debconf translation (thanks to Hideki Yamane)
  (Closes: #558071).

[ Gerfried Fuchs ]
* Build-Depend on po-debconf so po2debconf can be called.

Show diffs side-by-side

added added

removed removed

Lines of Context:
21
21
%%   terminate immediatetly. If the fsm trap_exit process flag has been
22
22
%%   set to true, the FSM terminate function will called.
23
23
%%   - You can pass the gen_fsm options to control resource usage.
24
 
%%   {max_messages, N} will exit the process with priority_shutdown
 
24
%%   {max_queue, N} will exit the process with priority_shutdown
25
25
%%   - You can limit the time processing a message (TODO): If the
26
26
%%   message processing does not return in a given period of time, the
27
27
%%   process will be terminated.
28
28
%% 
29
 
%%     $Id: p1_fsm.erl 1805 2009-01-12 14:52:59Z badlop $
 
29
%%     $Id: p1_fsm.erl 2644 2009-10-07 13:41:36Z ekhramtsov $
30
30
%%
31
31
-module(p1_fsm).
32
32
 
123
123
         sync_send_all_state_event/2, sync_send_all_state_event/3,
124
124
         reply/2,
125
125
         start_timer/2,send_event_after/2,cancel_timer/1,
126
 
         enter_loop/4, enter_loop/5, enter_loop/6]).
 
126
         enter_loop/4, enter_loop/5, enter_loop/6, wake_hib/7]).
127
127
 
128
128
-export([behaviour_info/1]).
129
129
 
273
273
    Name = get_proc_name(ServerName),
274
274
    Parent = get_parent(),
275
275
    Debug = gen:debug_options(Options),
276
 
    Limits= limit_options(Options),
277
 
    loop(Parent, Name, StateName, StateData, Mod, Timeout, Debug, Limits).
 
276
    Limits = limit_options(Options),
 
277
    Queue = queue:new(),
 
278
    QueueLen = 0,
 
279
    loop(Parent, Name, StateName, StateData, Mod, Timeout, Debug,
 
280
         Limits, Queue, QueueLen).
278
281
 
279
282
get_proc_name(Pid) when is_pid(Pid) ->
280
283
    Pid;
329
332
%%% ---------------------------------------------------
330
333
init_it(Starter, self, Name, Mod, Args, Options) ->
331
334
    init_it(Starter, self(), Name, Mod, Args, Options);
332
 
init_it(Starter, Parent, Name, Mod, Args, Options) ->
 
335
init_it(Starter, Parent, Name0, Mod, Args, Options) ->
 
336
    Name = name(Name0),
333
337
    Debug = gen:debug_options(Options),
334
 
    Limits= limit_options(Options),
 
338
    Limits = limit_options(Options),
 
339
    Queue = queue:new(),
 
340
    QueueLen = 0,
335
341
    case catch Mod:init(Args) of
336
342
        {ok, StateName, StateData} ->
337
343
            proc_lib:init_ack(Starter, {ok, self()}),       
338
 
            loop(Parent, Name, StateName, StateData, Mod, infinity, Debug, Limits);
 
344
            loop(Parent, Name, StateName, StateData, Mod, infinity, Debug, Limits, Queue, QueueLen);
339
345
        {ok, StateName, StateData, Timeout} ->
340
346
            proc_lib:init_ack(Starter, {ok, self()}),       
341
 
            loop(Parent, Name, StateName, StateData, Mod, Timeout, Debug, Limits);
 
347
            loop(Parent, Name, StateName, StateData, Mod, Timeout, Debug, Limits, Queue, QueueLen);
342
348
        {stop, Reason} ->
343
349
            proc_lib:init_ack(Starter, {error, Reason}),
344
350
            exit(Reason);
354
360
            exit(Error)
355
361
    end.
356
362
 
 
363
name({local,Name}) -> Name;
 
364
name({global,Name}) -> Name;
 
365
name(Pid) when is_pid(Pid) -> Pid.
 
366
 
357
367
%%-----------------------------------------------------------------
358
368
%% The MAIN loop
359
369
%%-----------------------------------------------------------------
 
370
loop(Parent, Name, StateName, StateData, Mod, hibernate, Debug,
 
371
     Limits, Queue, QueueLen)
 
372
  when QueueLen > 0 ->
 
373
    case queue:out(Queue) of
 
374
        {{value, Msg}, Queue1} ->
 
375
            decode_msg(Msg, Parent, Name, StateName, StateData, Mod, hibernate,
 
376
                       Debug, Limits, Queue1, QueueLen - 1, false);
 
377
        {empty, _} ->
 
378
            Reason = internal_queue_error,
 
379
            error_info(Reason, Name, hibernate, StateName, StateData, Debug),
 
380
            exit(Reason)
 
381
    end;
 
382
loop(Parent, Name, StateName, StateData, Mod, hibernate, Debug,
 
383
     Limits, _Queue, _QueueLen) ->
 
384
    proc_lib:hibernate(?MODULE,wake_hib,
 
385
                       [Parent, Name, StateName, StateData, Mod,
 
386
                        Debug, Limits]);
360
387
%% First we test if we have reach a defined limit ...
361
 
loop(Parent, Name, StateName, StateData, Mod, Time, Debug, Limits) ->
 
388
loop(Parent, Name, StateName, StateData, Mod, Time, Debug,
 
389
     Limits, Queue, QueueLen) ->
362
390
    try         
363
 
        message_queue_len(Limits)
 
391
        message_queue_len(Limits, QueueLen)
364
392
        %% TODO: We can add more limit checking here...
365
393
    catch
366
394
        {process_limit, Limit} ->
369
397
            terminate(Reason, Name, Msg, Mod, StateName, StateData, Debug)
370
398
    end,
371
399
    process_message(Parent, Name, StateName, StateData,
372
 
                    Mod, Time, Debug, Limits).
 
400
                    Mod, Time, Debug, Limits, Queue, QueueLen).
373
401
%% ... then we can process a new message:
374
 
process_message(Parent, Name, StateName, StateData, Mod, Time, Debug, Limits) ->
 
402
process_message(Parent, Name, StateName, StateData, Mod, Time, Debug,
 
403
                Limits, Queue, QueueLen) ->
 
404
    {Msg, Queue1, QueueLen1} = collect_messages(Queue, QueueLen, Time),
 
405
    decode_msg(Msg,Parent, Name, StateName, StateData, Mod, Time,
 
406
               Debug, Limits, Queue1, QueueLen1, false).
 
407
 
 
408
collect_messages(Queue, QueueLen, Time) ->
 
409
    receive
 
410
        Input ->
 
411
            case Input of
 
412
                {'EXIT', _Parent, priority_shutdown} ->
 
413
                    {Input, Queue, QueueLen};
 
414
                _ ->
 
415
                    collect_messages(
 
416
                      queue:in(Input, Queue), QueueLen + 1, Time)
 
417
            end
 
418
    after 0 ->
 
419
            case queue:out(Queue) of
 
420
                {{value, Msg}, Queue1} ->
 
421
                    {Msg, Queue1, QueueLen - 1};
 
422
                {empty, _} ->
 
423
                    receive
 
424
                        Input ->
 
425
                            {Input, Queue, QueueLen}
 
426
                    after Time ->
 
427
                            {{'$gen_event', timeout}, Queue, QueueLen}
 
428
                    end
 
429
            end
 
430
    end.
 
431
 
 
432
 
 
433
wake_hib(Parent, Name, StateName, StateData, Mod, Debug,
 
434
         Limits) ->
375
435
    Msg = receive
376
 
              {'EXIT', Parent, priority_shutdown} ->
377
 
                  {'EXIT', Parent, priority_shutdown}
378
 
          after 0 ->
379
 
                  receive
380
 
                      Input ->
381
 
                          Input
382
 
                  after Time ->
383
 
                          {'$gen_event', timeout}
384
 
                  end
 
436
              Input ->
 
437
                  Input
385
438
          end,
 
439
    Queue = queue:new(),
 
440
    QueueLen = 0,
 
441
    decode_msg(Msg, Parent, Name, StateName, StateData, Mod, hibernate,
 
442
               Debug, Limits, Queue, QueueLen, true).
 
443
 
 
444
decode_msg(Msg,Parent, Name, StateName, StateData, Mod, Time, Debug,
 
445
           Limits, Queue, QueueLen, Hib) ->
 
446
    put('$internal_queue_len', QueueLen),
386
447
    case Msg of
387
448
        {system, From, Req} ->
388
449
            sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug,
389
450
                                  [Name, StateName, StateData,
390
 
                                   Mod, Time, Limits]);
 
451
                                   Mod, Time, Limits, Queue, QueueLen], Hib);
391
452
        {'EXIT', Parent, Reason} ->
392
453
            terminate(Reason, Name, Msg, Mod, StateName, StateData, Debug);
393
454
        _Msg when Debug == [] ->
394
455
            handle_msg(Msg, Parent, Name, StateName, StateData,
395
 
                       Mod, Time, Limits);
 
456
                       Mod, Time, Limits, Queue, QueueLen);
396
457
        _Msg ->
397
458
            Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, 
398
459
                                      {Name, StateName}, {in, Msg}),
399
460
            handle_msg(Msg, Parent, Name, StateName, StateData,
400
 
                       Mod, Time, Debug1, Limits)
 
461
                       Mod, Time, Debug1, Limits, Queue, QueueLen)
401
462
    end.
402
463
 
403
464
%%-----------------------------------------------------------------
404
465
%% Callback functions for system messages handling.
405
466
%%-----------------------------------------------------------------
406
 
%% TODO: Fix me
407
467
system_continue(Parent, Debug, [Name, StateName, StateData,
408
 
                                Mod, Time, Limits]) ->
409
 
    loop(Parent, Name, StateName, StateData, Mod, Time, Debug, Limits).
 
468
                                Mod, Time, Limits, Queue, QueueLen]) ->
 
469
    loop(Parent, Name, StateName, StateData, Mod, Time, Debug,
 
470
         Limits, Queue, QueueLen).
410
471
 
411
472
system_terminate(Reason, _Parent, Debug,
412
473
                 [Name, StateName, StateData, Mod, _Time, _Limits]) ->
413
474
    terminate(Reason, Name, [], Mod, StateName, StateData, Debug).
414
475
 
415
 
system_code_change([Name, StateName, StateData, Mod, Time, Limits],
 
476
system_code_change([Name, StateName, StateData, Mod, Time,
 
477
                    Limits, Queue, QueueLen],
416
478
                   _Module, OldVsn, Extra) ->
417
479
    case catch Mod:code_change(OldVsn, StateName, StateData, Extra) of
418
480
        {ok, NewStateName, NewStateData} ->
419
 
            {ok, [Name, NewStateName, NewStateData, Mod, Time, Limits]};
 
481
            {ok, [Name, NewStateName, NewStateData, Mod, Time,
 
482
                  Limits, Queue, QueueLen]};
420
483
        Else -> Else
421
484
    end.
422
485
 
453
516
    io:format(Dev, "*DBG* ~p switched to state ~w~n",
454
517
              [Name, StateName]).
455
518
 
456
 
handle_msg(Msg, Parent, Name, StateName, StateData, Mod, _Time, Limits) -> %No debug here
 
519
handle_msg(Msg, Parent, Name, StateName, StateData, Mod, _Time,
 
520
           Limits, Queue, QueueLen) -> %No debug here
457
521
    From = from(Msg),
458
522
    case catch dispatch(Msg, Mod, StateName, StateData) of
459
523
        {next_state, NStateName, NStateData} ->     
460
524
            loop(Parent, Name, NStateName, NStateData,
461
 
                 Mod, infinity, [], Limits);
 
525
                 Mod, infinity, [], Limits, Queue, QueueLen);
462
526
        {next_state, NStateName, NStateData, Time1} ->
463
 
            loop(Parent, Name, NStateName, NStateData, Mod, Time1, [], Limits);
464
 
        {reply, Reply, NStateName, NStateData} when From /= undefined ->
 
527
            loop(Parent, Name, NStateName, NStateData, Mod, Time1, [],
 
528
                 Limits, Queue, QueueLen);
 
529
        {reply, Reply, NStateName, NStateData} when From =/= undefined ->
465
530
            reply(From, Reply),
466
531
            loop(Parent, Name, NStateName, NStateData,
467
 
                 Mod, infinity, [], Limits);
468
 
        {reply, Reply, NStateName, NStateData, Time1} when From /= undefined ->
 
532
                 Mod, infinity, [], Limits, Queue, QueueLen);
 
533
        {reply, Reply, NStateName, NStateData, Time1} when From =/= undefined ->
469
534
            reply(From, Reply),
470
 
            loop(Parent, Name, NStateName, NStateData, Mod, Time1, [], Limits);
 
535
            loop(Parent, Name, NStateName, NStateData, Mod, Time1, [],
 
536
                 Limits, Queue, QueueLen);
471
537
        {stop, Reason, NStateData} ->
472
538
            terminate(Reason, Name, Msg, Mod, StateName, NStateData, []);
473
 
        {stop, Reason, Reply, NStateData} when From /= undefined ->
 
539
        {stop, Reason, Reply, NStateData} when From =/= undefined ->
474
540
            {'EXIT', R} = (catch terminate(Reason, Name, Msg, Mod,
475
541
                                           StateName, NStateData, [])),
476
542
            reply(From, Reply),
483
549
    end.
484
550
 
485
551
handle_msg(Msg, Parent, Name, StateName, StateData,
486
 
           Mod, _Time, Debug, Limits) ->
 
552
           Mod, _Time, Debug, Limits, Queue, QueueLen) ->
487
553
    From = from(Msg),
488
554
    case catch dispatch(Msg, Mod, StateName, StateData) of
489
555
        {next_state, NStateName, NStateData} ->
490
556
            Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, 
491
557
                                      {Name, NStateName}, return),
492
558
            loop(Parent, Name, NStateName, NStateData,
493
 
                 Mod, infinity, Debug1, Limits);
 
559
                 Mod, infinity, Debug1, Limits, Queue, QueueLen);
494
560
        {next_state, NStateName, NStateData, Time1} ->
495
561
            Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, 
496
562
                                      {Name, NStateName}, return),
497
563
            loop(Parent, Name, NStateName, NStateData,
498
 
                 Mod, Time1, Debug1, Limits);
499
 
        {reply, Reply, NStateName, NStateData} when From /= undefined ->
500
 
            Debug1 = reply(Name, From, Reply, Debug, NStateName),
501
 
            loop(Parent, Name, NStateName, NStateData,
502
 
                 Mod, infinity, Debug1, Limits);
503
 
        {reply, Reply, NStateName, NStateData, Time1} when From /= undefined ->
504
 
            Debug1 = reply(Name, From, Reply, Debug, NStateName),
505
 
            loop(Parent, Name, NStateName, NStateData,
506
 
                 Mod, Time1, Debug1, Limits);
 
564
                 Mod, Time1, Debug1, Limits, Queue, QueueLen);
 
565
        {reply, Reply, NStateName, NStateData} when From =/= undefined ->
 
566
            Debug1 = reply(Name, From, Reply, Debug, NStateName),
 
567
            loop(Parent, Name, NStateName, NStateData,
 
568
                 Mod, infinity, Debug1, Limits, Queue, QueueLen);
 
569
        {reply, Reply, NStateName, NStateData, Time1} when From =/= undefined ->
 
570
            Debug1 = reply(Name, From, Reply, Debug, NStateName),
 
571
            loop(Parent, Name, NStateName, NStateData,
 
572
                 Mod, Time1, Debug1, Limits, Queue, QueueLen);
507
573
        {stop, Reason, NStateData} ->
508
574
            terminate(Reason, Name, Msg, Mod, StateName, NStateData, Debug);
509
 
        {stop, Reason, Reply, NStateData} when From /= undefined ->
 
575
        {stop, Reason, Reply, NStateData} when From =/= undefined ->
510
576
            {'EXIT', R} = (catch terminate(Reason, Name, Msg, Mod,
511
577
                                           StateName, NStateData, Debug)),
512
578
            reply(Name, From, Reply, Debug, StateName),
663
729
    Limits;
664
730
%% Maximum number of messages allowed in the process message queue
665
731
limit_options([{max_queue,N}|Options], Limits) 
666
 
  when integer(N) ->
 
732
  when is_integer(N) ->
667
733
    NewLimits = Limits#limits{max_queue=N},
668
734
    limit_options(Options, NewLimits);
669
735
limit_options([_|Options], Limits) ->
671
737
 
672
738
%% Throw max_queue if we have reach the max queue size
673
739
%% Returns ok otherwise
674
 
message_queue_len(#limits{max_queue = undefined}) ->
 
740
message_queue_len(#limits{max_queue = undefined}, _QueueLen) ->
675
741
    ok;
676
 
message_queue_len(#limits{max_queue = MaxQueue}) ->
 
742
message_queue_len(#limits{max_queue = MaxQueue}, QueueLen) ->
677
743
    Pid = self(),
678
744
    case process_info(Pid, message_queue_len) of
679
 
        {message_queue_len, N} when N > MaxQueue ->
680
 
            throw({process_limit, {max_queue, N}});
 
745
        {message_queue_len, N} when N + QueueLen > MaxQueue ->
 
746
            throw({process_limit, {max_queue, N + QueueLen}});
681
747
        _ ->
682
748
            ok
683
749
    end.