1
/*___INFO__MARK_BEGIN__*/
2
/*************************************************************************
4
* The Contents of this file are made available subject to the terms of
5
* the Sun Industry Standards Source License Version 1.2
7
* Sun Microsystems Inc., March, 2001
10
* Sun Industry Standards Source License Version 1.2
11
* =================================================
12
* The contents of this file are subject to the Sun Industry Standards
13
* Source License Version 1.2 (the "License"); You may not use this file
14
* except in compliance with the License. You may obtain a copy of the
15
* License at http://gridengine.sunsource.net/Gridengine_SISSL_license.html
17
* Software provided under this License is provided on an "AS IS" basis,
18
* WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING,
19
* WITHOUT LIMITATION, WARRANTIES THAT THE SOFTWARE IS FREE OF DEFECTS,
20
* MERCHANTABLE, FIT FOR A PARTICULAR PURPOSE, OR NON-INFRINGING.
21
* See the License for the specific provisions governing your rights and
22
* obligations concerning the Software.
24
* The Initial Developer of the Original Code is: Sun Microsystems, Inc.
26
* Copyright: 2001 by Sun Microsystems, Inc.
28
* All Rights Reserved.
30
************************************************************************/
31
/*___INFO__MARK_END__*/
44
#include "cl_commlib.h"
46
#include "uti/sge_profiling.h"
48
#define DATA_SIZE 5000
50
void sighandler_client(int sig);
51
static int do_shutdown = 0;
55
static int rcv_messages = 0;
56
static int snd_messages = 0;
57
static int evc_count = 0;
58
static int events_sent = 0;
60
static cl_com_handle_t* handle = NULL;
61
cl_raw_list_t* thread_list = NULL;
63
#define MAX_EVENT_CLIENTS 1000
64
cl_com_endpoint_t* event_client_array[MAX_EVENT_CLIENTS];
66
void *my_message_thread(void *t_conf);
68
void do_nothing(void) {
71
sprintf(help,"hallo");
74
void *my_event_thread(void *t_conf);
77
unsigned long my_application_status(char** info_message) {
78
if ( info_message != NULL ) {
79
*info_message = strdup("not specified (state 1)");
81
return (unsigned long)1;
84
void sighandler_client(
87
/* thread_signal_receiver = pthread_self(); */
95
/* shutdown all sockets */
102
#ifdef __CL_FUNCTION__
103
#undef __CL_FUNCTION__
105
#define __CL_FUNCTION__ "main()"
106
extern int main(int argc, char** argv)
108
cl_thread_settings_t* thread_p = NULL;
109
cl_thread_settings_t* dummy_thread_p = NULL;
113
double usec_last = 0.0;
115
int time_interval = 0;
121
printf("syntax: test_virtual_qmaster DEBUG_LEVEL PORT INTERVAL\n");
125
time_interval = atoi(argv[3]);
127
/* setup signalhandling */
128
memset(&sa, 0, sizeof(sa));
129
sa.sa_handler = sighandler_client; /* one handler for all signals */
130
sigemptyset(&sa.sa_mask);
131
sigaction(SIGINT, &sa, NULL);
132
sigaction(SIGTERM, &sa, NULL);
133
sigaction(SIGHUP, &sa, NULL);
134
sigaction(SIGPIPE, &sa, NULL);
136
printf("startup commlib ...\n");
137
cl_com_setup_commlib(CL_RW_THREAD /* CL_THREAD_POOL */ , (cl_log_t)atoi(argv[1]), NULL);
138
cl_com_set_status_func(my_application_status);
140
printf("setting up service on port %d\n", atoi(argv[2]) );
141
handle=cl_com_create_handle(NULL,CL_CT_TCP,CL_CM_CT_MESSAGE , CL_TRUE, atoi(argv[2]) , CL_TCP_DEFAULT,"virtual_master", 1 , 1,0 );
142
if (handle == NULL) {
143
printf("could not get handle\n");
147
cl_com_get_service_port(handle,&i),
148
printf("server running on host \"%s\", port %d, component name is \"%s\", id is %ld\n",
149
handle->local->comp_host,
151
handle->local->comp_name,
152
handle->local->comp_id);
154
printf("create application threads ...\n");
155
cl_thread_list_setup(&thread_list,"thread list");
156
cl_thread_list_create_thread(thread_list, &dummy_thread_p,cl_com_get_log_list(), "message_thread_1", 100, my_message_thread, NULL, NULL);
158
cl_thread_list_create_thread(thread_list, &dummy_thread_p,cl_com_get_log_list(), "message_thread_2", 101, my_message_thread, NULL, NULL);
160
cl_thread_list_create_thread(thread_list, &dummy_thread_p,cl_com_get_log_list(), "event_thread", 3, my_event_thread, NULL, NULL);
162
gettimeofday(&last,NULL);
163
usec_last = (last.tv_sec * 1000000.0) + last.tv_usec;
165
while(do_shutdown == 0 ) {
166
double usec_now = 0.0;
167
double interval = 0.0;
168
double rcv_m_sec = 0.0;
169
double snd_m_sec = 0.0;
170
double nr_evc_sec = 0.0;
171
double snd_ev_sec = 0.0;
172
cl_com_handle_statistic_t* statistic_data = NULL;
175
int nr_of_connections = 0;
177
gettimeofday(&now,NULL);
178
usec_now = (now.tv_sec * 1000000.0) + now.tv_usec;
180
interval = usec_now - usec_last;
181
interval /= 1000000.0;
183
if (interval > 0.0) {
184
rcv_m_sec = rcv_messages / interval;
185
snd_m_sec = snd_messages / interval;
186
nr_evc_sec = evc_count / interval;
187
snd_ev_sec = events_sent / interval;
190
cl_com_get_actual_statistic_data(handle, &statistic_data);
191
if (statistic_data != NULL) {
192
unread_msg = (int)statistic_data->unread_message_count;
193
unsend_msg = (int)statistic_data->unsend_message_count;
194
nr_of_connections = (int)statistic_data->nr_of_connections;
195
cl_com_free_handle_statistic(&statistic_data);
197
printf("|%.5f|[s] received|%d|%.3f|[nr.|1/s] sent|%d|%.3f|[nr.|1/s] event clients|%d|%.3f|[nr.|1/s] events sent|%d|%.3f|[nr.|1/s] rcv_buf|%d|snd_buf|%d| nr_connections|%d|\n",
199
rcv_messages, rcv_m_sec,
200
snd_messages, snd_m_sec,
201
evc_count, nr_evc_sec,
202
events_sent, snd_ev_sec,
208
cl_thread_wait_for_event(cl_thread_get_thread_config(),1,0);
209
if ( interval >= time_interval ) {
213
printf("shutdown threads ...\n");
214
/* delete all threads */
215
while ( (thread_p=cl_thread_list_get_first_thread(thread_list)) != NULL ) {
216
gettimeofday(&now,NULL);
217
printf("shutting down thread %s (%ld)...", thread_p->thread_name, (unsigned long) now.tv_sec);
219
cl_thread_list_delete_thread(thread_list, thread_p);
220
gettimeofday(&now,NULL);
221
printf(" done (%ld)\n", (unsigned long)now.tv_sec);
223
cl_thread_list_cleanup(&thread_list);
225
cl_com_ignore_timeouts(CL_TRUE);
227
printf("shutdown commlib ...\n");
228
cl_com_cleanup_commlib();
230
printf("main done\n");
236
#ifdef __CL_FUNCTION__
237
#undef __CL_FUNCTION__
239
#define __CL_FUNCTION__ "my_message_thread"
240
void *my_message_thread(void *t_conf) {
242
/* get pointer to cl_thread_settings_t struct */
243
cl_thread_settings_t *thread_config = (cl_thread_settings_t*)t_conf;
245
/* set thread config data */
246
if (cl_thread_set_thread_config(thread_config) != CL_RETVAL_OK) {
247
CL_LOG(CL_LOG_ERROR,"thread setup error");
252
CL_LOG(CL_LOG_INFO, "starting initialization ...");
254
/* thread init done, trigger startup conditon variable*/
255
cl_thread_func_startup(thread_config);
256
CL_LOG(CL_LOG_INFO, "starting main loop ...");
258
printf(" \"%s\" -> running ...\n", thread_config->thread_name );
264
/* ok, thread main */
265
while (do_exit == 0) {
267
cl_com_message_t* message = NULL;
268
cl_com_endpoint_t* sender = NULL;
270
cl_thread_func_testcancel(thread_config);
272
cl_commlib_trigger(handle, 1);
273
ret_val = cl_commlib_receive_message(handle, NULL, NULL, 0, /* handle, comp_host, comp_name , comp_id, */
274
CL_FALSE, 0, /* syncron, response_mid */
276
if (ret_val == CL_RETVAL_OK) {
279
printf(" \"%s\" -> received message from %s/%s/%ld: \"%s\" (%ld bytes)\n", thread_config->thread_name,
280
sender->comp_host,sender->comp_name,sender->comp_id,
281
message->message, message->message_length);
284
if (strcmp((char*)message->message,"event") == 0) {
287
printf(" \"%s\" -> new event client\n", thread_config->thread_name);
290
cl_com_free_message(&message);
292
for (i=0;i<MAX_EVENT_CLIENTS;i++) {
293
if ( event_client_array[i] == NULL ) {
294
event_client_array[i] = sender;
301
printf(" \"%s\" -> to much connected event clients\n", thread_config->thread_name);
302
cl_com_free_endpoint(&sender);
305
/* no event client, just return message to sender */
307
printf(" \"%s\" -> send gdi response to %s/%s/%ld\n", thread_config->thread_name,
308
sender->comp_host, sender->comp_name, sender->comp_id);
312
/* simulate a work for the gdi thread */
315
for (d=0;d<18000;d++) {
320
ret_val = cl_commlib_send_message(handle, sender->comp_host, sender->comp_name, sender->comp_id,
322
&(message->message), message->message_length,
323
NULL, 0, 0 , CL_FALSE, CL_FALSE);
324
if (ret_val == CL_RETVAL_OK) {
327
cl_com_free_message(&message);
328
cl_com_free_endpoint(&sender);
333
/* at least set exit state */
334
cl_thread_func_cleanup(thread_config);
339
#ifdef __CL_FUNCTION__
340
#undef __CL_FUNCTION__
342
#define __CL_FUNCTION__ "my_event_thread()"
343
void *my_event_thread(void *t_conf) {
345
/* get pointer to cl_thread_settings_t struct */
346
cl_thread_settings_t *thread_config = (cl_thread_settings_t*)t_conf;
347
cl_byte_t *reference = NULL;
349
/* set thread config data */
350
if (cl_thread_set_thread_config(thread_config) != CL_RETVAL_OK) {
351
CL_LOG(CL_LOG_ERROR,"thread setup error");
357
/* thread init done, trigger startup conditon variable*/
358
cl_thread_func_startup(thread_config);
359
CL_LOG(CL_LOG_INFO, "starting main loop ...");
360
printf(" \"%s\" -> running ...\n", thread_config->thread_name );
363
/* ok, thread main */
364
while (do_exit == 0) {
367
static int event_nr = 0;
370
cl_thread_func_testcancel(thread_config);
372
/* this should be 60 events/second per event client*/
373
for(nr=0;nr<60;nr++) {
375
for (i=0;i<MAX_EVENT_CLIENTS;i++) {
376
cl_com_endpoint_t* client = event_client_array[i];
377
if ( client != NULL) {
378
char help[DATA_SIZE];
379
memset(help, 0, DATA_SIZE);
385
sprintf(help,"event nr.: %d", event_nr );
387
printf(" \"%s\" -> sending event to %s/%s/%ld\n", thread_config->thread_name,
388
client->comp_host, client->comp_name, client->comp_id );
390
reference = (cl_byte_t *)help;
391
ret_val = cl_commlib_send_message(handle, client->comp_host, client->comp_name, client->comp_id,
392
CL_MIH_MAT_NAK, &reference, DATA_SIZE,
393
NULL, 0, 0 , CL_TRUE, CL_FALSE);
395
if ( ret_val != CL_RETVAL_OK) {
396
cl_com_free_endpoint(&(event_client_array[i])); /* should be locked at this point */
405
if ((ret_val = cl_thread_wait_for_event(thread_config,1, 0 )) != CL_RETVAL_OK) { /* nothing to do sleep 1 sec */
407
case CL_RETVAL_CONDITION_WAIT_TIMEOUT:
408
CL_LOG(CL_LOG_INFO,"condition wait timeout");
411
CL_LOG_STR( CL_LOG_INFO, ">got error<: ", cl_get_error_text(ret_val));
417
CL_LOG(CL_LOG_INFO, "exiting ...");
418
/* at least set exit state */
419
cl_thread_func_cleanup(thread_config);