~ubuntu-branches/ubuntu/utopic/gridengine/utopic

« back to all changes in this revision

Viewing changes to source/libs/comm/test_virtual_qmaster.c

  • Committer: Bazaar Package Importer
  • Author(s): Mark Hymers
  • Date: 2008-06-25 22:36:13 UTC
  • Revision ID: james.westby@ubuntu.com-20080625223613-tvd9xlhuoct9kyhm
Tags: upstream-6.2~beta2
ImportĀ upstreamĀ versionĀ 6.2~beta2

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*___INFO__MARK_BEGIN__*/
 
2
/*************************************************************************
 
3
 * 
 
4
 *  The Contents of this file are made available subject to the terms of
 
5
 *  the Sun Industry Standards Source License Version 1.2
 
6
 * 
 
7
 *  Sun Microsystems Inc., March, 2001
 
8
 * 
 
9
 * 
 
10
 *  Sun Industry Standards Source License Version 1.2
 
11
 *  =================================================
 
12
 *  The contents of this file are subject to the Sun Industry Standards
 
13
 *  Source License Version 1.2 (the "License"); You may not use this file
 
14
 *  except in compliance with the License. You may obtain a copy of the
 
15
 *  License at http://gridengine.sunsource.net/Gridengine_SISSL_license.html
 
16
 * 
 
17
 *  Software provided under this License is provided on an "AS IS" basis,
 
18
 *  WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING,
 
19
 *  WITHOUT LIMITATION, WARRANTIES THAT THE SOFTWARE IS FREE OF DEFECTS,
 
20
 *  MERCHANTABLE, FIT FOR A PARTICULAR PURPOSE, OR NON-INFRINGING.
 
21
 *  See the License for the specific provisions governing your rights and
 
22
 *  obligations concerning the Software.
 
23
 * 
 
24
 *   The Initial Developer of the Original Code is: Sun Microsystems, Inc.
 
25
 * 
 
26
 *   Copyright: 2001 by Sun Microsystems, Inc.
 
27
 * 
 
28
 *   All Rights Reserved.
 
29
 * 
 
30
 ************************************************************************/
 
31
/*___INFO__MARK_END__*/
 
32
 
 
33
 
 
34
#include <stdio.h>
 
35
#include <string.h>
 
36
#include <sys/time.h>
 
37
#include <stdlib.h>
 
38
#include <signal.h>
 
39
#include <unistd.h>
 
40
 
 
41
 
 
42
 
 
43
#include "cl_lists.h"
 
44
#include "cl_commlib.h"
 
45
 
 
46
#include "uti/sge_profiling.h"
 
47
 
 
48
#define DATA_SIZE 5000
 
49
 
 
50
void sighandler_client(int sig);
 
51
static int do_shutdown = 0;
 
52
 
 
53
/* counters */
 
54
 
 
55
static int rcv_messages = 0;
 
56
static int snd_messages = 0;
 
57
static int evc_count = 0;
 
58
static int events_sent = 0;
 
59
 
 
60
static cl_com_handle_t* handle = NULL; 
 
61
cl_raw_list_t* thread_list = NULL;
 
62
 
 
63
#define MAX_EVENT_CLIENTS 1000
 
64
cl_com_endpoint_t* event_client_array[MAX_EVENT_CLIENTS];
 
65
 
 
66
void *my_message_thread(void *t_conf);
 
67
 
 
68
void do_nothing(void) {
 
69
   char help[255];
 
70
 
 
71
   sprintf(help,"hallo");
 
72
}
 
73
 
 
74
void *my_event_thread(void *t_conf);
 
75
 
 
76
 
 
77
unsigned long my_application_status(char** info_message) {
 
78
   if ( info_message != NULL ) {
 
79
      *info_message = strdup("not specified (state 1)");
 
80
   }
 
81
   return (unsigned long)1;
 
82
}
 
83
 
 
84
void sighandler_client(
 
85
int sig 
 
86
) {
 
87
/*   thread_signal_receiver = pthread_self(); */
 
88
   if (sig == SIGPIPE) {
 
89
      return;
 
90
   }
 
91
 
 
92
   if (sig == SIGHUP) {
 
93
      return;
 
94
   }
 
95
   /* shutdown all sockets */
 
96
   do_shutdown = 1;
 
97
}
 
98
 
 
99
 
 
100
 
 
101
 
 
102
#ifdef __CL_FUNCTION__
 
103
#undef __CL_FUNCTION__
 
104
#endif
 
105
#define __CL_FUNCTION__ "main()"
 
106
extern int main(int argc, char** argv)
 
107
{
 
108
  cl_thread_settings_t* thread_p = NULL;
 
109
  cl_thread_settings_t* dummy_thread_p = NULL;
 
110
  struct sigaction sa;
 
111
  struct timeval now;
 
112
  struct timeval last;
 
113
  double usec_last = 0.0;
 
114
  int i;
 
115
  int time_interval = 0;
 
116
 
 
117
  prof_mt_init();
 
118
 
 
119
  if (argc != 4) {
 
120
      
 
121
      printf("syntax: test_virtual_qmaster DEBUG_LEVEL PORT INTERVAL\n");
 
122
      exit(1);
 
123
  }
 
124
 
 
125
  time_interval = atoi(argv[3]);
 
126
  
 
127
  /* setup signalhandling */
 
128
  memset(&sa, 0, sizeof(sa));
 
129
  sa.sa_handler = sighandler_client;  /* one handler for all signals */
 
130
  sigemptyset(&sa.sa_mask);
 
131
  sigaction(SIGINT, &sa, NULL);
 
132
  sigaction(SIGTERM, &sa, NULL);
 
133
  sigaction(SIGHUP, &sa, NULL);
 
134
  sigaction(SIGPIPE, &sa, NULL);
 
135
 
 
136
  printf("startup commlib ...\n");
 
137
  cl_com_setup_commlib(CL_RW_THREAD /* CL_THREAD_POOL */ , (cl_log_t)atoi(argv[1]), NULL);
 
138
  cl_com_set_status_func(my_application_status); 
 
139
 
 
140
  printf("setting up service on port %d\n", atoi(argv[2]) );
 
141
  handle=cl_com_create_handle(NULL,CL_CT_TCP,CL_CM_CT_MESSAGE , CL_TRUE, atoi(argv[2]) , CL_TCP_DEFAULT,"virtual_master", 1 , 1,0 );
 
142
  if (handle == NULL) {
 
143
     printf("could not get handle\n");
 
144
     exit(1);
 
145
  }
 
146
 
 
147
  cl_com_get_service_port(handle,&i), 
 
148
  printf("server running on host \"%s\", port %d, component name is \"%s\", id is %ld\n", 
 
149
         handle->local->comp_host, 
 
150
         i, 
 
151
         handle->local->comp_name,  
 
152
         handle->local->comp_id);
 
153
 
 
154
  printf("create application threads ...\n");
 
155
  cl_thread_list_setup(&thread_list,"thread list");
 
156
  cl_thread_list_create_thread(thread_list, &dummy_thread_p,cl_com_get_log_list(), "message_thread_1", 100, my_message_thread, NULL, NULL);
 
157
#if 1
 
158
  cl_thread_list_create_thread(thread_list, &dummy_thread_p,cl_com_get_log_list(), "message_thread_2", 101, my_message_thread, NULL, NULL);
 
159
#endif
 
160
  cl_thread_list_create_thread(thread_list, &dummy_thread_p,cl_com_get_log_list(), "event_thread", 3, my_event_thread, NULL, NULL);
 
161
 
 
162
  gettimeofday(&last,NULL);
 
163
  usec_last = (last.tv_sec * 1000000.0) + last.tv_usec;
 
164
  
 
165
  while(do_shutdown == 0 ) {
 
166
     double usec_now  = 0.0;
 
167
     double interval  = 0.0;
 
168
     double rcv_m_sec = 0.0;
 
169
     double snd_m_sec = 0.0;
 
170
     double nr_evc_sec = 0.0;
 
171
     double snd_ev_sec = 0.0;
 
172
     cl_com_handle_statistic_t* statistic_data = NULL;
 
173
     int unread_msg = 0;
 
174
     int unsend_msg = 0;
 
175
     int nr_of_connections = 0;
 
176
 
 
177
     gettimeofday(&now,NULL);
 
178
     usec_now  = (now.tv_sec  * 1000000.0) + now.tv_usec;
 
179
     
 
180
     interval = usec_now - usec_last;
 
181
     interval /= 1000000.0;
 
182
 
 
183
     if (interval > 0.0) {
 
184
        rcv_m_sec  = rcv_messages / interval;
 
185
        snd_m_sec  = snd_messages / interval;
 
186
        nr_evc_sec = evc_count    / interval;
 
187
        snd_ev_sec = events_sent  / interval;
 
188
     }
 
189
 
 
190
     cl_com_get_actual_statistic_data(handle, &statistic_data);
 
191
     if (statistic_data != NULL) {
 
192
        unread_msg = (int)statistic_data->unread_message_count;
 
193
        unsend_msg = (int)statistic_data->unsend_message_count;
 
194
        nr_of_connections = (int)statistic_data->nr_of_connections;
 
195
        cl_com_free_handle_statistic(&statistic_data);
 
196
     }
 
197
     printf("|%.5f|[s] received|%d|%.3f|[nr.|1/s] sent|%d|%.3f|[nr.|1/s] event clients|%d|%.3f|[nr.|1/s] events sent|%d|%.3f|[nr.|1/s] rcv_buf|%d|snd_buf|%d| nr_connections|%d|\n", 
 
198
            interval,
 
199
            rcv_messages, rcv_m_sec, 
 
200
            snd_messages, snd_m_sec,
 
201
            evc_count,    nr_evc_sec,
 
202
            events_sent,  snd_ev_sec,
 
203
            unread_msg,
 
204
            unsend_msg,
 
205
            nr_of_connections);
 
206
     fflush(stdout);
 
207
 
 
208
     cl_thread_wait_for_event(cl_thread_get_thread_config(),1,0);
 
209
     if ( interval >= time_interval ) {
 
210
        break;
 
211
     }
 
212
  }
 
213
  printf("shutdown threads ...\n");
 
214
  /* delete all threads */
 
215
  while ( (thread_p=cl_thread_list_get_first_thread(thread_list)) != NULL ) {
 
216
     gettimeofday(&now,NULL);
 
217
     printf("shutting down thread %s (%ld)...", thread_p->thread_name, (unsigned long) now.tv_sec);
 
218
     fflush(stdout);
 
219
     cl_thread_list_delete_thread(thread_list, thread_p);
 
220
     gettimeofday(&now,NULL);
 
221
     printf(" done (%ld)\n", (unsigned long)now.tv_sec);
 
222
  }
 
223
  cl_thread_list_cleanup(&thread_list);
 
224
 
 
225
  cl_com_ignore_timeouts(CL_TRUE);
 
226
 
 
227
  printf("shutdown commlib ...\n");
 
228
  cl_com_cleanup_commlib();
 
229
  fflush(stdout);
 
230
  printf("main done\n");
 
231
  return 0;
 
232
}
 
233
 
 
234
 
 
235
 
 
236
#ifdef __CL_FUNCTION__
 
237
#undef __CL_FUNCTION__
 
238
#endif
 
239
#define __CL_FUNCTION__ "my_message_thread"
 
240
void *my_message_thread(void *t_conf) {
 
241
   int do_exit = 0;
 
242
   /* get pointer to cl_thread_settings_t struct */
 
243
   cl_thread_settings_t *thread_config = (cl_thread_settings_t*)t_conf; 
 
244
 
 
245
   /* set thread config data */
 
246
   if (cl_thread_set_thread_config(thread_config) != CL_RETVAL_OK) {
 
247
      CL_LOG(CL_LOG_ERROR,"thread setup error"); 
 
248
      do_exit = 1;
 
249
   }
 
250
 
 
251
   /* setup thread */
 
252
   CL_LOG(CL_LOG_INFO, "starting initialization ...");
 
253
 
 
254
   /* thread init done, trigger startup conditon variable*/
 
255
   cl_thread_func_startup(thread_config);
 
256
   CL_LOG(CL_LOG_INFO, "starting main loop ...");
 
257
 
 
258
   printf(" \"%s\" -> running ...\n", thread_config->thread_name );
 
259
   rcv_messages = 0;
 
260
   snd_messages = 0;
 
261
   evc_count    = 0;
 
262
   
 
263
 
 
264
   /* ok, thread main */
 
265
   while (do_exit == 0) {
 
266
      int ret_val = 0;
 
267
      cl_com_message_t*  message = NULL;
 
268
      cl_com_endpoint_t* sender  = NULL;
 
269
 
 
270
      cl_thread_func_testcancel(thread_config);
 
271
 
 
272
      cl_commlib_trigger(handle, 1);
 
273
      ret_val = cl_commlib_receive_message(handle, NULL, NULL, 0,      /* handle, comp_host, comp_name , comp_id, */
 
274
                                           CL_FALSE, 0,                       /* syncron, response_mid */
 
275
                                           &message, &sender);
 
276
      if (ret_val == CL_RETVAL_OK) {
 
277
         rcv_messages++;
 
278
#if 0
 
279
         printf(" \"%s\" -> received message from %s/%s/%ld: \"%s\" (%ld bytes)\n", thread_config->thread_name, 
 
280
                                                                        sender->comp_host,sender->comp_name,sender->comp_id,
 
281
                                                                        message->message, message->message_length);
 
282
#endif
 
283
 
 
284
         if (strcmp((char*)message->message,"event") == 0) {
 
285
            int i,help;
 
286
#if 0
 
287
            printf(" \"%s\" -> new event client\n", thread_config->thread_name);
 
288
#endif
 
289
 
 
290
            cl_com_free_message(&message);
 
291
            help = 0;
 
292
            for (i=0;i<MAX_EVENT_CLIENTS;i++) {
 
293
               if ( event_client_array[i] == NULL ) {
 
294
                  event_client_array[i] = sender;
 
295
                  evc_count++;
 
296
                  help=1;
 
297
                  break;
 
298
               }
 
299
            }
 
300
            if (help != 1) {
 
301
               printf(" \"%s\" -> to much connected event clients\n", thread_config->thread_name);
 
302
               cl_com_free_endpoint(&sender);
 
303
            } 
 
304
         } else {
 
305
            /* no event client, just return message to sender */
 
306
#if 0
 
307
            printf(" \"%s\" -> send gdi response to %s/%s/%ld\n", thread_config->thread_name, 
 
308
                           sender->comp_host, sender->comp_name, sender->comp_id);
 
309
#endif
 
310
 
 
311
#if 0
 
312
           /* simulate a work for the gdi thread */
 
313
           {
 
314
              int d;
 
315
              for (d=0;d<18000;d++) {
 
316
                 do_nothing();
 
317
              }
 
318
            }
 
319
#endif
 
320
            ret_val = cl_commlib_send_message(handle, sender->comp_host, sender->comp_name, sender->comp_id,
 
321
                                      CL_MIH_MAT_NAK,
 
322
                                      &(message->message), message->message_length,
 
323
                                      NULL, 0, 0 , CL_FALSE, CL_FALSE);
 
324
            if (ret_val == CL_RETVAL_OK) {
 
325
               snd_messages++;
 
326
            }
 
327
            cl_com_free_message(&message);
 
328
            cl_com_free_endpoint(&sender);
 
329
         }
 
330
      }
 
331
   }
 
332
 
 
333
   /* at least set exit state */
 
334
   cl_thread_func_cleanup(thread_config);  
 
335
   return(NULL);
 
336
}
 
337
 
 
338
 
 
339
#ifdef __CL_FUNCTION__
 
340
#undef __CL_FUNCTION__
 
341
#endif
 
342
#define __CL_FUNCTION__ "my_event_thread()"
 
343
void *my_event_thread(void *t_conf) {
 
344
   int do_exit = 0;
 
345
   /* get pointer to cl_thread_settings_t struct */
 
346
   cl_thread_settings_t *thread_config = (cl_thread_settings_t*)t_conf; 
 
347
   cl_byte_t *reference = NULL;
 
348
 
 
349
   /* set thread config data */
 
350
   if (cl_thread_set_thread_config(thread_config) != CL_RETVAL_OK) {
 
351
      CL_LOG(CL_LOG_ERROR,"thread setup error"); 
 
352
      do_exit = 1;
 
353
   }
 
354
 
 
355
   /* setup thread */
 
356
 
 
357
   /* thread init done, trigger startup conditon variable*/
 
358
   cl_thread_func_startup(thread_config);
 
359
   CL_LOG(CL_LOG_INFO, "starting main loop ...");
 
360
   printf(" \"%s\" -> running ...\n", thread_config->thread_name );
 
361
 
 
362
   
 
363
   /* ok, thread main */
 
364
   while (do_exit == 0) {
 
365
      int ret_val;
 
366
      int i,first,nr;
 
367
      static int event_nr = 0;
 
368
      
 
369
 
 
370
      cl_thread_func_testcancel(thread_config);
 
371
 
 
372
      /* this should be 60 events/second per event client*/
 
373
      for(nr=0;nr<60;nr++) {
 
374
         first = 0;
 
375
         for (i=0;i<MAX_EVENT_CLIENTS;i++) {
 
376
            cl_com_endpoint_t* client = event_client_array[i];
 
377
            if ( client != NULL) {
 
378
               char help[DATA_SIZE];
 
379
               memset(help, 0, DATA_SIZE);
 
380
   
 
381
               if (first == 0) {
 
382
                  event_nr++;
 
383
                  first = 1;
 
384
               }
 
385
               sprintf(help,"event nr.: %d", event_nr );
 
386
#if 0
 
387
               printf(" \"%s\" -> sending event to %s/%s/%ld\n", thread_config->thread_name, 
 
388
                                                                 client->comp_host, client->comp_name, client->comp_id  );
 
389
#endif
 
390
               reference = (cl_byte_t *)help;
 
391
               ret_val = cl_commlib_send_message(handle, client->comp_host, client->comp_name, client->comp_id,
 
392
                                                 CL_MIH_MAT_NAK, &reference, DATA_SIZE,
 
393
                                                 NULL, 0, 0 , CL_TRUE, CL_FALSE);
 
394
             
 
395
               if ( ret_val != CL_RETVAL_OK) {
 
396
                  cl_com_free_endpoint(&(event_client_array[i])); /* should be locked at this point */
 
397
                  evc_count--;
 
398
               } else {
 
399
                  events_sent++;
 
400
               }
 
401
            }
 
402
         }
 
403
      }
 
404
 
 
405
      if ((ret_val = cl_thread_wait_for_event(thread_config,1, 0 )) != CL_RETVAL_OK) {  /* nothing to do sleep 1 sec */
 
406
         switch(ret_val) {
 
407
            case CL_RETVAL_CONDITION_WAIT_TIMEOUT:
 
408
               CL_LOG(CL_LOG_INFO,"condition wait timeout");
 
409
               break;
 
410
            default:
 
411
               CL_LOG_STR( CL_LOG_INFO, ">got error<: ", cl_get_error_text(ret_val));
 
412
               do_exit = 1;
 
413
         }
 
414
      }
 
415
   }
 
416
 
 
417
   CL_LOG(CL_LOG_INFO, "exiting ...");
 
418
   /* at least set exit state */
 
419
   cl_thread_func_cleanup(thread_config);  
 
420
   return(NULL);
 
421
}
 
422
 
 
423