~ubuntu-branches/ubuntu/trusty/mod-gearman/trusty

« back to all changes in this revision

Viewing changes to neb_module/mod_gearman.c

  • Committer: Bazaar Package Importer
  • Author(s): Stig Sandbeck Mathisen
  • Date: 2011-09-22 21:25:15 UTC
  • mfrom: (4.1.2 sid)
  • Revision ID: james.westby@ubuntu.com-20110922212515-w9amj0jz6vt9l36i
Tags: 1.0.10-1
* New upstream release
* Use upstream tarball instead of git repo

Show diffs side-by-side

added added

removed removed

Lines of Context:
47
47
void *gearman_module_handle=NULL;
48
48
gearman_client_st client;
49
49
 
50
 
int send_now               = FALSE;
51
 
int result_threads_running = 0;
 
50
int send_now, result_threads_running;
52
51
pthread_t result_thr[GM_LISTSIZE];
53
52
char target_queue[GM_BUFFERSIZE];
54
53
char temp_buffer[GM_BUFFERSIZE];
72
71
int nebmodule_init( int flags, char *args, nebmodule *handle ) {
73
72
    int i;
74
73
    int broker_option_errors = 0;
 
74
    send_now                 = FALSE;
 
75
    result_threads_running   = 0;
75
76
 
76
77
    /* save our handle */
77
78
    gearman_module_handle=handle;
88
89
    set_default_options(mod_gm_opt);
89
90
 
90
91
    /* parse arguments */
91
 
    read_arguments( args );
92
 
    gm_log( GM_LOG_INFO,  "Version %s\n", GM_VERSION );
93
 
    gm_log( GM_LOG_TRACE, "args: %s\n", args );
 
92
    gm_log( GM_LOG_DEBUG, "Version %s\n", GM_VERSION );
 
93
    gm_log( GM_LOG_DEBUG, "args: %s\n", args );
94
94
    gm_log( GM_LOG_TRACE, "nebmodule_init(%i, %i)\n", flags );
95
95
    gm_log( GM_LOG_DEBUG, "running on libgearman %s\n", gearman_version() );
96
96
 
 
97
    if( read_arguments( args ) == GM_ERROR )
 
98
        return NEB_ERROR;
 
99
 
97
100
    /* check for minimum eventbroker options */
98
101
    if(!(event_broker_options & BROKER_PROGRAM_STATE)) {
99
102
        gm_log( GM_LOG_ERROR, "mod_gearman needs BROKER_PROGRAM_STATE (%i) event_broker_options enabled to work\n", BROKER_PROGRAM_STATE );
154
157
    neb_register_callback( NEBCALLBACK_TIMED_EVENT_DATA, gearman_module_handle, 0, handle_timed_events );
155
158
 
156
159
    /* register export callbacks */
157
 
    for(i=0;i<=GM_NEBTYPESSIZE;i++) {
 
160
    for(i=0;i<GM_NEBTYPESSIZE;i++) {
158
161
        if(mod_gm_opt->exports[i]->elem_number > 0)
159
162
            neb_register_callback( i, gearman_module_handle, 0, handle_export );
160
163
    }
161
164
 
 
165
    /* log at least one line into the core logfile */
 
166
    if ( mod_gm_opt->logmode != GM_LOG_MODE_CORE ) {
 
167
        int logmode_saved = mod_gm_opt->logmode;
 
168
        mod_gm_opt->logmode = GM_LOG_MODE_CORE;
 
169
        gm_log( GM_LOG_INFO,  "initialized version %s (libgearman %s)\n", GM_VERSION, gearman_version() );
 
170
        mod_gm_opt->logmode = logmode_saved;
 
171
    }
 
172
 
162
173
    gm_log( GM_LOG_DEBUG, "finished initializing\n" );
163
174
 
164
175
    return NEB_OK;
217
228
    }
218
229
 
219
230
    /* register export callbacks */
220
 
    for(x=0;x<=GM_NEBTYPESSIZE;x++) {
 
231
    for(x=0;x<GM_NEBTYPESSIZE;x++) {
221
232
        if(mod_gm_opt->exports[x]->elem_number > 0)
222
233
            neb_deregister_callback( x, gearman_module_handle );
223
234
    }
227
238
    gm_log( GM_LOG_DEBUG, "deregistered callbacks\n" );
228
239
 
229
240
    /* stop result threads */
230
 
    for(x = 0; x < mod_gm_opt->result_workers; x++) {
 
241
    for(x = 0; x < result_threads_running; x++) {
231
242
        pthread_cancel(result_thr[x]);
232
243
        pthread_join(result_thr[x], NULL);
233
244
    }
234
245
 
235
 
    /* cleanup client */
 
246
    /* cleanup */
236
247
    free_client(&client);
237
248
 
 
249
    mod_gm_free_opt(mod_gm_opt);
 
250
 
238
251
    return NEB_OK;
239
252
}
240
253
 
388
401
    gm_log( GM_LOG_TRACE, "got eventhandler event: %s\n", ds->command_line );
389
402
 
390
403
    temp_buffer[0]='\x0';
391
 
    snprintf( temp_buffer,sizeof( temp_buffer )-1,"type=eventhandler\ncommand_line=%s\n\n\n",ds->command_line );
392
 
    temp_buffer[sizeof( temp_buffer )-1]='\x0';
 
404
    snprintf( temp_buffer,GM_BUFFERSIZE-1,"type=eventhandler\ncommand_line=%s\n\n\n",ds->command_line );
393
405
 
394
406
    if(add_job_to_queue( &client,
395
407
                         mod_gm_opt->server_list,
495
507
    gm_log( GM_LOG_TRACE, "cmd_line: %s\n", processed_command );
496
508
 
497
509
    temp_buffer[0]='\x0';
498
 
    snprintf( temp_buffer,sizeof( temp_buffer )-1,"type=host\nresult_queue=%s\nhost_name=%s\nstart_time=%i.0\ntimeout=%d\ncommand_line=%s\n\n\n",
 
510
    snprintf( temp_buffer,GM_BUFFERSIZE-1,"type=host\nresult_queue=%s\nhost_name=%s\nstart_time=%i.0\ntimeout=%d\ncommand_line=%s\n\n\n",
499
511
              mod_gm_opt->result_queue,
500
512
              hst->name,
501
513
              ( int )hst->next_check,
502
514
              host_check_timeout,
503
515
              processed_command
504
516
            );
505
 
    temp_buffer[sizeof( temp_buffer )-1]='\x0';
506
517
 
507
518
    if(add_job_to_queue( &client,
508
519
                         mod_gm_opt->server_list,
621
632
    gm_log( GM_LOG_TRACE, "cmd_line: %s\n", processed_command );
622
633
 
623
634
    temp_buffer[0]='\x0';
624
 
    snprintf( temp_buffer,sizeof( temp_buffer )-1,"type=service\nresult_queue=%s\nhost_name=%s\nservice_description=%s\nstart_time=%i.0\ntimeout=%d\ncommand_line=%s\n\n\n",
 
635
    snprintf( temp_buffer,GM_BUFFERSIZE-1,"type=service\nresult_queue=%s\nhost_name=%s\nservice_description=%s\nstart_time=%i.0\ntimeout=%d\ncommand_line=%s\n\n\n",
625
636
              mod_gm_opt->result_queue,
626
637
              svcdata->host_name,
627
638
              svcdata->service_description,
629
640
              service_check_timeout,
630
641
              processed_command
631
642
            );
632
 
    temp_buffer[sizeof( temp_buffer )-1]='\x0';
633
643
 
634
644
    uniq[0]='\x0';
635
 
    snprintf( uniq,sizeof( temp_buffer )-1,"%s-%s", svcdata->host_name, svcdata->service_description);
 
645
    snprintf( uniq,GM_BUFFERSIZE-1,"%s-%s", svcdata->host_name, svcdata->service_description);
636
646
 
637
647
    /* execute forced checks with high prio as they are propably user requested */
638
648
    if(check_result_info.check_options & CHECK_OPTION_FORCE_EXECUTION)
783
793
            servicegroup * temp_servicegroup = find_servicegroup( mod_gm_opt->servicegroups_list[x] );
784
794
            if ( is_service_member_of_servicegroup( temp_servicegroup,svc )==TRUE ) {
785
795
                gm_log( GM_LOG_TRACE, "service is member of servicegroup: %s\n", mod_gm_opt->servicegroups_list[x] );
786
 
                snprintf( target_queue, sizeof(target_queue)-1, "servicegroup_%s", mod_gm_opt->servicegroups_list[x] );
787
 
                target_queue[sizeof( target_queue )-1]='\x0';
 
796
                snprintf( target_queue, GM_BUFFERSIZE-1, "servicegroup_%s", mod_gm_opt->servicegroups_list[x] );
788
797
                return;
789
798
            }
790
799
            x++;
797
806
        hostgroup * temp_hostgroup = find_hostgroup( mod_gm_opt->hostgroups_list[x] );
798
807
        if ( is_host_member_of_hostgroup( temp_hostgroup,hst )==TRUE ) {
799
808
            gm_log( GM_LOG_TRACE, "server is member of hostgroup: %s\n", mod_gm_opt->hostgroups_list[x] );
800
 
            snprintf( target_queue, sizeof(target_queue)-1, "hostgroup_%s", mod_gm_opt->hostgroups_list[x] );
801
 
            target_queue[sizeof( target_queue )-1]='\x0';
 
809
            snprintf( target_queue, GM_BUFFERSIZE-1, "hostgroup_%s", mod_gm_opt->hostgroups_list[x] );
802
810
            return;
803
811
        }
804
812
        x++;
807
815
    if ( svc ) {
808
816
        /* pass into the general service queue */
809
817
        if ( mod_gm_opt->services == GM_ENABLED && svc ) {
810
 
            snprintf( target_queue, sizeof(target_queue)-1, "service" );
811
 
            target_queue[sizeof( target_queue )-1]='\x0';
 
818
            snprintf( target_queue, GM_BUFFERSIZE-1, "service" );
812
819
            return;
813
820
        }
814
821
    }
815
822
    else {
816
823
        /* pass into the general host queue */
817
824
        if ( mod_gm_opt->hosts == GM_ENABLED ) {
818
 
            snprintf( target_queue, sizeof(target_queue)-1, "host" );
819
 
            target_queue[sizeof( target_queue )-1]='\x0';
 
825
            snprintf( target_queue, GM_BUFFERSIZE-1, "host" );
820
826
            return;
821
827
        }
822
828
    }
827
833
 
828
834
/* start our threads */
829
835
static void start_threads(void) {
830
 
    if ( !result_threads_running ) {
 
836
    if ( result_threads_running < mod_gm_opt->result_workers ) {
831
837
        /* create result worker */
832
838
        int x;
833
839
        for(x = 0; x < mod_gm_opt->result_workers; x++) {
870
876
                }
871
877
 
872
878
                uniq[0]='\x0';
873
 
                snprintf( uniq,sizeof( temp_buffer )-1,"%s", hostchkdata->host_name);
 
879
                snprintf( uniq,GM_BUFFERSIZE-1,"%s", hostchkdata->host_name);
874
880
 
875
881
                temp_buffer[0]='\x0';
876
 
                snprintf( temp_buffer,sizeof( temp_buffer )-1,
 
882
                snprintf( temp_buffer,GM_BUFFERSIZE-1,
877
883
                            "DATATYPE::HOSTPERFDATA\t"
878
884
                            "TIMET::%d\t"
879
885
                            "HOSTNAME::%s\t"
885
891
                            hostchkdata->host_name, hostchkdata->perf_data,
886
892
                            hostchkdata->command_name, hostchkdata->command_args,
887
893
                            hostchkdata->state, hostchkdata->state_type);
888
 
                temp_buffer[sizeof( temp_buffer )-1]='\x0';
889
894
                has_perfdata = TRUE;
890
895
            }
891
896
            break;
906
911
                }
907
912
 
908
913
                uniq[0]='\x0';
909
 
                snprintf( uniq,sizeof( temp_buffer )-1,"%s-%s", srvchkdata->host_name, srvchkdata->service_description);
 
914
                snprintf( uniq,GM_BUFFERSIZE-1,"%s-%s", srvchkdata->host_name, srvchkdata->service_description);
910
915
 
911
916
                temp_buffer[0]='\x0';
912
 
                snprintf( temp_buffer,sizeof( temp_buffer )-1,
 
917
                snprintf( temp_buffer,GM_BUFFERSIZE-1,
913
918
                            "DATATYPE::SERVICEPERFDATA\t"
914
919
                            "TIMET::%d\t"
915
920
                            "HOSTNAME::%s\t"
922
927
                            srvchkdata->host_name, srvchkdata->service_description,
923
928
                            srvchkdata->perf_data, svc->service_check_command,
924
929
                            srvchkdata->state, srvchkdata->state_type);
925
 
                temp_buffer[sizeof( temp_buffer )-1]='\x0';
 
930
                temp_buffer[GM_BUFFERSIZE-1]='\x0';
926
931
                has_perfdata = TRUE;
927
932
            }
928
933
            break;
987
992
        case NEBCALLBACK_PROCESS_DATA:                      /*  7 */
988
993
            npd    = (nebstruct_process_data *)data;
989
994
            type   = nebtype2str(npd->type);
990
 
            snprintf( temp_buffer,sizeof( temp_buffer )-1, "{\"callback_type\":\"%s\",\"type\":\"%s\",\"flags\":%d,\"attr\":%d,\"timestamp\":%d.%d}",
 
995
            snprintf( temp_buffer,GM_BUFFERSIZE-1, "{\"callback_type\":\"%s\",\"type\":\"%s\",\"flags\":%d,\"attr\":%d,\"timestamp\":%d.%d}",
991
996
                    "NEBCALLBACK_PROCESS_DATA",
992
997
                    type,
993
998
                    npd->flags,
1000
1005
            nted       = (nebstruct_timed_event_data *)data;
1001
1006
            event_type = eventtype2str(nted->event_type);
1002
1007
            type       = nebtype2str(nted->type);
1003
 
            snprintf( temp_buffer,sizeof( temp_buffer )-1, "{\"callback_type\":\"%s\",\"event_type\":\"%s\",\"type\":\"%s\",\"flags\":%d,\"attr\":%d,\"timestamp\":%d.%d,\"recurring\":%d,\"run_time\":%d}",
 
1008
            snprintf( temp_buffer,GM_BUFFERSIZE-1, "{\"callback_type\":\"%s\",\"event_type\":\"%s\",\"type\":\"%s\",\"flags\":%d,\"attr\":%d,\"timestamp\":%d.%d,\"recurring\":%d,\"run_time\":%d}",
1004
1009
                    "NEBCALLBACK_TIMED_EVENT_DATA",
1005
1010
                    event_type,
1006
1011
                    type,
1017
1022
            nld    = (nebstruct_log_data *)data;
1018
1023
            buffer = escapestring(nld->data);
1019
1024
            type   = nebtype2str(nld->type);
1020
 
            snprintf( temp_buffer,sizeof( temp_buffer )-1, "{\"callback_type\":\"%s\",\"type\":\"%s\",\"flags\":%d,\"attr\":%d,\"timestamp\":%d.%d,\"entry_time\":%d,\"data_type\":%d,\"data\":\"%s\"}",
 
1025
            snprintf( temp_buffer,GM_BUFFERSIZE-1, "{\"callback_type\":\"%s\",\"type\":\"%s\",\"flags\":%d,\"attr\":%d,\"timestamp\":%d.%d,\"entry_time\":%d,\"data_type\":%d,\"data\":\"%s\"}",
1021
1026
                    "NEBCALLBACK_LOG_DATA",
1022
1027
                    type,
1023
1028
                    nld->flags,
1081
1086
            return 0;
1082
1087
    }
1083
1088
 
1084
 
    temp_buffer[sizeof( temp_buffer )-1]='\x0';
1085
1089
    if(temp_buffer[0] != '\x0') {
1086
1090
 
1087
1091
        for(i=0;i<mod_gm_opt->exports[callback_type]->elem_number;i++) {