1
/*___INFO__MARK_BEGIN__*/
2
/*************************************************************************
4
* The Contents of this file are made available subject to the terms of
5
* the Sun Industry Standards Source License Version 1.2
7
* Sun Microsystems Inc., March, 2001
10
* Sun Industry Standards Source License Version 1.2
11
* =================================================
12
* The contents of this file are subject to the Sun Industry Standards
13
* Source License Version 1.2 (the "License"); You may not use this file
14
* except in compliance with the License. You may obtain a copy of the
15
* License at http://gridengine.sunsource.net/Gridengine_SISSL_license.html
17
* Software provided under this License is provided on an "AS IS" basis,
18
* WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING,
19
* WITHOUT LIMITATION, WARRANTIES THAT THE SOFTWARE IS FREE OF DEFECTS,
20
* MERCHANTABLE, FIT FOR A PARTICULAR PURPOSE, OR NON-INFRINGING.
21
* See the License for the specific provisions governing your rights and
22
* obligations concerning the Software.
24
* The Initial Developer of the Original Code is: Sun Microsystems, Inc.
26
* Copyright: 2001 by Sun Microsystems, Inc.
28
* All Rights Reserved.
30
************************************************************************/
31
/*___INFO__MARK_END__*/
38
#include <sys/resource.h>
41
#include "sge_string.h"
44
#include "cl_commlib.h"
45
#include "cl_handle_list.h"
46
#include "cl_connection_list.h"
47
#include "cl_app_message_queue.h"
48
#include "cl_message_list.h"
49
#include "cl_host_list.h"
50
#include "cl_endpoint_list.h"
51
#include "cl_application_error_list.h"
52
#include "cl_host_alias_list.h"
53
#include "cl_parameter_list.h"
54
#include "cl_communication.h"
55
#include "cl_tcp_framework.h"
56
#include "cl_ssl_framework.h"
59
#include "msg_commlib.h"
61
/* enable more commlib logging by definening CL_DO_COMMLIB_DEBUG at compile time */
63
/* the next switch is used to send ack for messages when they arive
64
at commlib layer or when application removes them from commlib */
66
#define CL_DO_SEND_ACK_AT_COMMLIB_LAYER /* send ack when message arives */
69
static void cl_com_default_application_debug_client_callback(int dc_connected, int debug_level);
71
static int cl_commlib_check_callback_functions(void);
72
static int cl_commlib_check_connection_count(cl_com_handle_t* handle);
73
static int cl_commlib_calculate_statistic(cl_com_handle_t* handle, cl_bool_t force_update ,int lock_list);
74
static int cl_commlib_handle_debug_clients(cl_com_handle_t* handle, cl_bool_t lock_list);
76
static int cl_commlib_handle_connection_read(cl_com_connection_t* connection);
77
static int cl_commlib_handle_connection_write(cl_com_connection_t* connection);
78
static int cl_commlib_finish_request_completeness(cl_com_connection_t* connection);
80
static int cl_commlib_handle_connection_ack_timeouts(cl_com_connection_t* connection);
81
static int cl_commlib_send_ack_message(cl_com_connection_t* connection, cl_com_message_t* message);
82
static int cl_commlib_send_ccm_message(cl_com_connection_t* connection);
83
static int cl_commlib_send_ccrm_message(cl_com_connection_t* connection);
84
static int cl_commlib_send_sim_message(cl_com_connection_t* connection, unsigned long* mid);
85
static int cl_commlib_send_sirm_message(cl_com_connection_t* connection,
86
cl_com_message_t* message, /* message id of SIM */
87
unsigned long starttime,
88
unsigned long runtime,
89
unsigned long buffered_read_messages,
90
unsigned long buffered_write_messages,
91
unsigned long connection_count,
92
unsigned long application_status,
94
static int cl_com_trigger(cl_com_handle_t* handle, int synchron);
96
/* threads functions */
97
static void *cl_com_trigger_thread(void *t_conf);
98
static void *cl_com_handle_service_thread(void *t_conf);
99
static void *cl_com_handle_write_thread(void *t_conf);
100
static void *cl_com_handle_read_thread(void *t_conf);
104
static int cl_commlib_send_message_to_endpoint(cl_com_handle_t* handle,
105
cl_com_endpoint_t* endpoint,
106
cl_xml_ack_type_t ack_type,
109
unsigned long response_mid,
114
/* cl_com_handle_list
117
* Each entry in this list is a service handler connection */
118
static pthread_mutex_t cl_com_handle_list_mutex = PTHREAD_MUTEX_INITIALIZER;
119
static cl_raw_list_t* cl_com_handle_list = NULL;
125
* Each entry in this list can be logged via cl_log_list_flush()
127
* also used for setting cl_commlib_debug_resolvable_hosts and cl_commlib_debug_unresolvable_hosts
129
static pthread_mutex_t cl_com_log_list_mutex = PTHREAD_MUTEX_INITIALIZER;
130
static cl_raw_list_t* cl_com_log_list = NULL;
131
static char* cl_commlib_debug_resolvable_hosts = NULL;
132
static char* cl_commlib_debug_unresolvable_hosts = NULL;
137
* Each entry in this list is a cached cl_get_hostbyname()
138
or cl_gethostbyaddr() call */
139
static pthread_mutex_t cl_com_host_list_mutex = PTHREAD_MUTEX_INITIALIZER;
140
static cl_raw_list_t* cl_com_host_list = NULL;
142
/* cl_com_endpoint_list
143
* ====================
145
* Each entry in this list is a known endpoint */
146
static pthread_mutex_t cl_com_endpoint_list_mutex = PTHREAD_MUTEX_INITIALIZER;
147
static cl_raw_list_t* cl_com_endpoint_list = NULL;
149
/* cl_com_thread_list
152
* Each entry is a thread from cl_thread.c module
154
static pthread_mutex_t cl_com_thread_list_mutex = PTHREAD_MUTEX_INITIALIZER;
155
static cl_raw_list_t* cl_com_thread_list = NULL;
157
/* cl_com_application_error_list
158
* =============================
160
* Each entry is an error which has to provided to the application
163
static pthread_mutex_t cl_com_application_error_list_mutex = PTHREAD_MUTEX_INITIALIZER;
164
static cl_raw_list_t* cl_com_application_error_list = NULL;
166
/* cl_com_parameter_list
167
* =========================
169
* Each entry in this list is a parameter out
170
the qmaster params */
171
static pthread_mutex_t cl_com_parameter_list_mutex = PTHREAD_MUTEX_INITIALIZER;
172
static cl_raw_list_t* cl_com_parameter_list = NULL;
174
static void cl_commlib_app_message_queue_cleanup(cl_com_handle_t* handle);
177
/* global flags/variables */
179
/* cl_com_create_threads
180
* =====================
182
* If set to 0, no threads are used */
183
static cl_thread_mode_t cl_com_create_threads = CL_NO_THREAD;
186
/* global application function pointer for statistic calculation */
187
/* see cl_commlib_calculate_statistic() */
188
static pthread_mutex_t cl_com_application_mutex = PTHREAD_MUTEX_INITIALIZER;
189
static cl_app_status_func_t cl_com_application_status_func = NULL;
191
/* global application function pointer for errors */
192
/* JG: TODO: we don't need this mutex, just lock cl_com_application_error_list instead. */
193
static pthread_mutex_t cl_com_error_mutex = PTHREAD_MUTEX_INITIALIZER;
194
static cl_error_func_t cl_com_error_status_func = NULL;
196
/* global application function pointer for getting tag id names */
197
static pthread_mutex_t cl_com_tag_name_mutex = PTHREAD_MUTEX_INITIALIZER;
198
static cl_tag_name_func_t cl_com_tag_name_func = NULL;
200
/* global application function pointer for debug clients */
201
static pthread_mutex_t cl_com_debug_client_callback_func_mutex = PTHREAD_MUTEX_INITIALIZER;
202
static cl_app_debug_client_func_t cl_com_debug_client_callback_func = cl_com_default_application_debug_client_callback;
204
static pthread_mutex_t cl_com_ssl_setup_mutex = PTHREAD_MUTEX_INITIALIZER;
205
static cl_ssl_setup_t* cl_com_ssl_setup_config = NULL;
208
/* this is the offical commlib boolean parameter setup
209
TODO: merge all global settings into this structure */
210
typedef struct cl_com_global_settings_def {
211
cl_bool_t delayed_listen;
212
} cl_com_global_settings_t;
214
static pthread_mutex_t cl_com_global_settings_mutex = PTHREAD_MUTEX_INITIALIZER;
215
static cl_com_global_settings_t cl_com_global_settings = {CL_FALSE};
217
static int cl_message_list_append_send(cl_com_connection_t* c, cl_com_message_t* m, int l);
218
static int cl_message_list_remove_send(cl_com_connection_t* c, cl_com_message_t* m, int l);
219
static int cl_message_list_append_receive(cl_com_connection_t* c, cl_com_message_t* m, int l);
220
static int cl_message_list_remove_receive(cl_com_connection_t* c, cl_com_message_t* m, int l);
223
* Prevent these functions made inline by compiler. This is
224
* necessary for Solaris 10 dtrace pid provider to work.
227
#pragma no_inline(cl_message_list_append_send, cl_message_list_remove_send, cl_message_list_append_receive, cl_message_list_remove_receive)
230
static int cl_message_list_append_send(cl_com_connection_t* c, cl_com_message_t* m, int l)
232
return cl_message_list_append_message(c->send_message_list, m, l);
234
static int cl_message_list_remove_send(cl_com_connection_t* c, cl_com_message_t* m, int l)
236
return cl_message_list_remove_message(c->send_message_list, m, l);
238
static int cl_message_list_append_receive(cl_com_connection_t* c, cl_com_message_t* m, int l)
240
return cl_message_list_append_message(c->received_message_list, m, l);
242
static int cl_message_list_remove_receive(cl_com_connection_t* c, cl_com_message_t* m, int l)
244
return cl_message_list_remove_message(c->received_message_list, m, l);
248
#ifdef __CL_FUNCTION__
249
#undef __CL_FUNCTION__
251
#define __CL_FUNCTION__ "cl_com_get_host_list()"
252
cl_raw_list_t* cl_com_get_host_list(void) {
253
cl_raw_list_t* host_list = NULL;
254
pthread_mutex_lock(&cl_com_host_list_mutex);
255
host_list = cl_com_host_list;
256
pthread_mutex_unlock(&cl_com_host_list_mutex);
260
#ifdef __CL_FUNCTION__
261
#undef __CL_FUNCTION__
263
#define __CL_FUNCTION__ "cl_com_get_endpoint_list()"
264
cl_raw_list_t* cl_com_get_endpoint_list(void) {
265
cl_raw_list_t* endpoint_list = NULL;
266
pthread_mutex_lock(&cl_com_endpoint_list_mutex);
267
endpoint_list = cl_com_endpoint_list;
268
pthread_mutex_unlock(&cl_com_endpoint_list_mutex);
269
return endpoint_list;
274
#ifdef __CL_FUNCTION__
275
#undef __CL_FUNCTION__
277
#define __CL_FUNCTION__ "cl_com_get_log_list()"
278
cl_raw_list_t* cl_com_get_log_list(void) {
279
cl_raw_list_t* log_list = NULL;
280
pthread_mutex_lock(&cl_com_log_list_mutex);
281
log_list = cl_com_log_list;
282
pthread_mutex_unlock(&cl_com_log_list_mutex);
287
#ifdef __CL_FUNCTION__
288
#undef __CL_FUNCTION__
290
#define __CL_FUNCTION__ "cl_com_set_status_func()"
291
int cl_com_set_status_func(cl_app_status_func_t status_func) {
292
pthread_mutex_lock(&cl_com_application_mutex);
293
cl_com_application_status_func = *status_func;
294
pthread_mutex_unlock(&cl_com_application_mutex);
300
#ifdef __CL_FUNCTION__
301
#undef __CL_FUNCTION__
303
#define __CL_FUNCTION__ "cl_com_get_parameter_list_value()"
304
int cl_com_get_parameter_list_value(char* parameter, char** value) {
305
cl_parameter_list_elem_t* elem = NULL;
306
int retval = CL_RETVAL_UNKNOWN_PARAMETER;
308
if (parameter == NULL || value == NULL || *value != NULL) {
309
return CL_RETVAL_PARAMS;
312
pthread_mutex_lock(&cl_com_parameter_list_mutex);
313
cl_raw_list_lock(cl_com_parameter_list);
314
elem = cl_parameter_list_get_first_elem(cl_com_parameter_list);
315
while (elem != NULL) {
316
if (strcmp(elem->parameter,parameter) == 0) {
317
/* found matching element */
318
*value = strdup(elem->value);
319
if (*value == NULL) {
320
retval = CL_RETVAL_MALLOC;
322
retval = CL_RETVAL_OK;
326
elem = cl_parameter_list_get_next_elem(elem);
328
cl_raw_list_unlock(cl_com_parameter_list);
329
pthread_mutex_unlock(&cl_com_parameter_list_mutex);
333
#ifdef __CL_FUNCTION__
334
#undef __CL_FUNCTION__
336
#define __CL_FUNCTION__ "cl_com_get_parameter_list_string()"
337
int cl_com_get_parameter_list_string(char** param_string) {
338
int retval = CL_RETVAL_UNKNOWN_PARAMETER;
340
if (*param_string != NULL) {
341
return CL_RETVAL_PARAMS;
344
pthread_mutex_lock(&cl_com_parameter_list_mutex);
345
retval = cl_parameter_list_get_param_string(cl_com_parameter_list, param_string, 1);
346
pthread_mutex_unlock(&cl_com_parameter_list_mutex);
351
#ifdef __CL_FUNCTION__
352
#undef __CL_FUNCTION__
354
#define __CL_FUNCTION__ "cl_com_set_parameter_list_value()"
355
int cl_com_set_parameter_list_value(char* parameter, char* value) {
356
cl_parameter_list_elem_t* elem = NULL;
357
int retval = CL_RETVAL_UNKNOWN_PARAMETER;
359
if (parameter == NULL || value == NULL) {
360
return CL_RETVAL_PARAMS;
363
pthread_mutex_lock(&cl_com_parameter_list_mutex);
365
cl_raw_list_lock(cl_com_parameter_list);
366
elem = cl_parameter_list_get_first_elem(cl_com_parameter_list);
367
while (elem != NULL) {
368
if (strcmp(elem->parameter,parameter) == 0 ) {
369
/* found matching element */
370
if (elem->value != NULL) {
373
elem->value = strdup(value);
374
if (elem->value == NULL) {
375
retval = CL_RETVAL_MALLOC;
377
retval = CL_RETVAL_OK;
380
elem = cl_parameter_list_get_next_elem(elem);
382
if (retval == CL_RETVAL_UNKNOWN_PARAMETER) {
383
retval = cl_parameter_list_append_parameter(cl_com_parameter_list, parameter, value, 0);
385
cl_raw_list_unlock(cl_com_parameter_list);
386
pthread_mutex_unlock(&cl_com_parameter_list_mutex);
391
#ifdef __CL_FUNCTION__
392
#undef __CL_FUNCTION__
394
#define __CL_FUNCTION__ "cl_com_remove_parameter_list_value()"
395
int cl_com_remove_parameter_list_value(char* parameter) {
396
int retval = CL_RETVAL_OK;
397
pthread_mutex_lock(&cl_com_parameter_list_mutex);
398
retval = cl_parameter_list_remove_parameter(cl_com_parameter_list, parameter,1);
399
pthread_mutex_unlock(&cl_com_parameter_list_mutex);
403
#ifdef __CL_FUNCTION__
404
#undef __CL_FUNCTION__
406
#define __CL_FUNCTION__ "cl_com_update_parameter_list()"
407
int cl_com_update_parameter_list(char* parameter) {
408
int retval = CL_RETVAL_OK;
409
const char* param_token = NULL;
410
struct saved_vars_s* context = NULL;
412
cl_com_set_parameter_list_value("gdi_timeout", "60");
413
cl_com_set_parameter_list_value("gdi_retries", "0");
414
cl_com_set_parameter_list_value("cl_ping", "false");
418
param_token = sge_strtok_r(parameter, ",; ", &context);
420
/*overriding the default values with values found in qmaster_params*/
421
while (param_token != NULL) {
422
if (strstr(param_token, "gdi_timeout") || strstr(param_token, "gdi_retries") || strstr(param_token, "cl_ping")) {
423
char* sub_token1 = NULL;
424
char* sub_token2 = NULL;
425
struct saved_vars_s* context2 = NULL;
426
sub_token1 = sge_strtok_r(param_token, "=", &context2);
427
sub_token2 = sge_strtok_r(NULL, "=", &context2);
429
if (sub_token2 != NULL) {
430
if (strstr(sub_token1, "gdi_timeout") || strstr(sub_token1, "gdi_retries")) {
431
if (sge_str_is_number(sub_token2)) {
432
cl_com_set_parameter_list_value(sub_token1, sub_token2);
435
if (strstr(sub_token1, "cl_ping")) {
436
if ((strncasecmp(sub_token2, "true", sizeof("true")-1) == 0 &&
437
strlen(sub_token2) == sizeof("true")-1) ||
438
((strncasecmp(sub_token2, "false", sizeof("false")-1) == 0) &&
439
strlen(sub_token2) == sizeof("false")-1)){
440
cl_com_set_parameter_list_value(sub_token1, sub_token2);
445
sge_free_saved_vars(context2);
447
param_token = sge_strtok_r(NULL, ",; ", &context);
449
sge_free_saved_vars(context);
454
/* The only errors supported by error func calls are:
455
CL_RETVAL_UNKNOWN: when CRM contains unexpected error
456
CL_RETVAL_ACCESS_DENIED: when CRM says that access to service is denied
457
CL_CRM_CS_ENDPOINT_NOT_UNIQUE: when CRM says that there is already an endpoint with this id connected */
458
#ifdef __CL_FUNCTION__
459
#undef __CL_FUNCTION__
461
#define __CL_FUNCTION__ "cl_com_set_error_func()"
462
int cl_com_set_error_func(cl_error_func_t error_func) {
463
pthread_mutex_lock(&cl_com_error_mutex);
464
cl_com_error_status_func = *error_func;
465
pthread_mutex_unlock(&cl_com_error_mutex);
470
#ifdef __CL_FUNCTION__
471
#undef __CL_FUNCTION__
473
#define __CL_FUNCTION__ "cl_com_set_tag_name_func()"
474
int cl_com_set_tag_name_func(cl_tag_name_func_t tag_name_func) {
475
pthread_mutex_lock(&cl_com_tag_name_mutex);
476
cl_com_tag_name_func = *tag_name_func;
477
pthread_mutex_unlock(&cl_com_tag_name_mutex);
482
#ifdef __CL_FUNCTION__
483
#undef __CL_FUNCTION__
485
#define __CL_FUNCTION__ "cl_com_setup_callback_functions()"
486
int cl_com_setup_callback_functions(cl_com_connection_t* connection) {
487
if (connection == NULL) {
488
return CL_RETVAL_PARAMS;
491
/* set global error function from application */
492
pthread_mutex_lock(&cl_com_error_mutex);
493
connection->error_func = cl_com_error_status_func;
494
pthread_mutex_unlock(&cl_com_error_mutex);
496
/* set global tag name function from application */
497
pthread_mutex_lock(&cl_com_tag_name_mutex);
498
connection->tag_name_func = cl_com_tag_name_func;
499
pthread_mutex_unlock(&cl_com_tag_name_mutex);
505
#ifdef __CL_FUNCTION__
506
#undef __CL_FUNCTION__
508
#define __CL_FUNCTION__ "cl_commlib_push_application_error()"
509
int cl_commlib_push_application_error(cl_log_t cl_err_type, int cl_error, const char* cl_info_text) {
510
const char* cl_info = cl_info_text;
511
int retval = CL_RETVAL_OK;
513
if (cl_info == NULL) {
514
cl_info = MSG_CL_COMMLIB_NO_ADDITIONAL_INFO;
515
retval = CL_RETVAL_PARAMS;
518
pthread_mutex_lock(&cl_com_error_mutex);
519
if (cl_com_error_status_func != NULL) {
520
CL_LOG_STR(CL_LOG_INFO,"add application error id: ", cl_get_error_text(cl_error));
521
CL_LOG_STR(CL_LOG_INFO,"add application error: ", cl_info );
522
cl_application_error_list_push_error(cl_com_application_error_list, cl_err_type, cl_error, cl_info, 1);
524
retval = CL_RETVAL_UNKNOWN;
525
CL_LOG(CL_LOG_ERROR,"no application error function set" );
526
CL_LOG_STR(CL_LOG_ERROR,"ignore application error id: ", cl_get_error_text(cl_error));
527
CL_LOG_STR(CL_LOG_ERROR,"ignore application error: ", cl_info );
529
pthread_mutex_unlock(&cl_com_error_mutex);
533
#ifdef __CL_FUNCTION__
534
#undef __CL_FUNCTION__
536
#define __CL_FUNCTION__ "cl_commlib_check_callback_functions()"
537
static int cl_commlib_check_callback_functions(void) {
539
* This function will call application callback functions in context
540
* of application. This happens when application is calling
541
* cl_commlib_send_message(), cl_commlib_receive_message() and
542
* cl_com_cleanup_commlib(), because
543
* this is the only chance to be in the application context (thread).
546
cl_thread_settings_t* actual_thread = NULL;
547
cl_bool_t is_commlib_thread = CL_FALSE;
549
switch(cl_com_create_threads) {
553
actual_thread = cl_thread_get_thread_config();
554
if (actual_thread != NULL) {
555
if (actual_thread->thread_pointer != NULL) {
556
is_commlib_thread = CL_TRUE;
562
/* check if caller is application (thread) */
563
if ( is_commlib_thread == CL_FALSE ) {
564
/* here we are in application context, we can call trigger functions */
565
cl_application_error_list_elem_t* elem = NULL;
566
pthread_mutex_lock(&cl_com_error_mutex);
567
cl_raw_list_lock(cl_com_application_error_list);
568
while( (elem = cl_application_error_list_get_first_elem(cl_com_application_error_list)) != NULL ) {
569
cl_raw_list_remove_elem(cl_com_application_error_list, elem->raw_elem);
571
/* now trigger application error func */
572
if (cl_com_error_status_func != NULL) {
573
CL_LOG(CL_LOG_INFO,"triggering application error function");
574
cl_com_error_status_func(elem);
576
CL_LOG(CL_LOG_WARNING,"can't trigger application error function: no function set");
583
cl_raw_list_unlock(cl_com_application_error_list);
584
pthread_mutex_unlock(&cl_com_error_mutex);
592
/****** cl_commlib/cl_com_setup_commlib_complete() *****************************
594
* cl_com_setup_commlib_complete() -- check whether commlib setup was called
597
* cl_bool_t cl_com_setup_commlib_complete(void)
600
* This function returns CL_TRUE when cl_com_setup_commlib() was called
604
* cl_bool_t - CL_TRUE: cl_com_setup_commlib() was done
605
* CL_FALSE: There was no commlib setup till now
608
* MT-NOTE: cl_com_setup_commlib_complete() is MT safe
611
* cl_commlib/cl_com_setup_commlib()
612
*******************************************************************************/
613
#ifdef __CL_FUNCTION__
614
#undef __CL_FUNCTION__
616
#define __CL_FUNCTION__ "cl_com_setup_commlib_complete()"
617
cl_bool_t cl_com_setup_commlib_complete(void) {
618
cl_bool_t setup_complete = CL_FALSE;
620
pthread_mutex_lock(&cl_com_log_list_mutex);
621
if (cl_com_log_list != NULL) {
622
setup_complete = CL_TRUE;
624
pthread_mutex_unlock(&cl_com_log_list_mutex);
625
return setup_complete;
628
/* the cl_com_get_(un)resolveable_hosts functions don't need a mutex,
629
* because the memory is malloced in cl_com_setup_commlib()
630
* and freed in cl_com_cleanup_commlib()
632
char* cl_com_get_resolvable_hosts(void) {
633
return cl_commlib_debug_resolvable_hosts;
636
char* cl_com_get_unresolvable_hosts(void) {
637
return cl_commlib_debug_unresolvable_hosts;
640
#ifdef __CL_FUNCTION__
641
#undef __CL_FUNCTION__
643
#define __CL_FUNCTION__ "cl_com_setup_commlib()"
644
int cl_com_setup_commlib(cl_thread_mode_t t_mode, cl_log_t debug_level, cl_log_func_t flush_func)
646
int ret_val = CL_RETVAL_OK;
647
cl_thread_settings_t* thread_p = NULL;
648
cl_bool_t duplicate_call = CL_FALSE;
649
cl_bool_t different_thread_mode = CL_FALSE;
652
/* setup global log list */
653
pthread_mutex_lock(&cl_com_log_list_mutex);
654
help = getenv("SGE_COMMLIB_DEBUG_RESOLVE");
656
if (cl_commlib_debug_resolvable_hosts == NULL) {
657
cl_commlib_debug_resolvable_hosts = strdup(help);
660
help = getenv("SGE_COMMLIB_DEBUG_NO_RESOLVE");
662
if (cl_commlib_debug_unresolvable_hosts == NULL) {
663
cl_commlib_debug_unresolvable_hosts = strdup(help);
667
if (cl_com_log_list != NULL) {
668
duplicate_call = CL_TRUE;
669
if (cl_com_handle_list != NULL) {
670
if (cl_raw_list_get_elem_count(cl_com_handle_list) > 0) {
671
if (cl_com_create_threads != t_mode) {
672
different_thread_mode = CL_TRUE;
678
if (cl_com_log_list == NULL) {
679
#ifdef CL_DO_COMMLIB_DEBUG
680
ret_val = cl_log_list_setup(&cl_com_log_list, "main", 0, /* CL_LOG_FLUSHED */ CL_LOG_IMMEDIATE , NULL );
682
ret_val = cl_log_list_setup(&cl_com_log_list, "main", 0, /* CL_LOG_FLUSHED */ CL_LOG_IMMEDIATE , flush_func);
684
if (cl_com_log_list == NULL) {
685
pthread_mutex_unlock(&cl_com_log_list_mutex);
686
cl_com_cleanup_commlib();
690
pthread_mutex_unlock(&cl_com_log_list_mutex);
691
cl_log_list_set_log_level(cl_com_log_list, debug_level );
693
if (duplicate_call == CL_TRUE) {
694
CL_LOG(CL_LOG_WARNING,"duplicate call to cl_com_setup_commlib()");
697
if (different_thread_mode == CL_TRUE) {
698
CL_LOG(CL_LOG_ERROR,"duplicate call to cl_com_setup_commlib() with different thread mode");
699
cl_commlib_push_application_error(CL_LOG_ERROR, CL_RETVAL_COMMLIB_SETUP_ALREADY_CALLED, MSG_CL_COMMLIB_CANT_SWITCH_THREAD_MODE_WITH_EXISTING_HANDLES);
701
cl_com_create_threads = t_mode;
704
/* setup global application error list */
705
pthread_mutex_lock(&cl_com_application_error_list_mutex);
706
if (cl_com_application_error_list == NULL) {
707
ret_val = cl_application_error_list_setup(&cl_com_application_error_list, "application errors");
708
if (cl_com_application_error_list == NULL) {
709
pthread_mutex_unlock(&cl_com_application_error_list_mutex);
710
cl_com_cleanup_commlib();
714
pthread_mutex_unlock(&cl_com_application_error_list_mutex);
716
/* setup ssl framework */
717
ret_val = cl_com_ssl_framework_setup();
718
if (ret_val != CL_RETVAL_OK) {
719
cl_com_cleanup_commlib();
724
/* setup global cl_com_handle_list */
725
pthread_mutex_lock(&cl_com_handle_list_mutex);
726
if (cl_com_handle_list == NULL) {
727
ret_val = cl_handle_list_setup(&cl_com_handle_list,"handle list");
728
if (cl_com_handle_list == NULL) {
729
pthread_mutex_unlock(&cl_com_handle_list_mutex);
730
cl_com_cleanup_commlib();
734
pthread_mutex_unlock(&cl_com_handle_list_mutex);
736
/* setup global host list */
737
pthread_mutex_lock(&cl_com_host_list_mutex);
738
if (cl_com_host_list == NULL) {
739
ret_val = cl_host_list_setup(&cl_com_host_list, "global_host_cache", CL_SHORT, NULL , NULL, 0 , 0, 0, CL_TRUE);
740
if (cl_com_host_list == NULL) {
741
pthread_mutex_unlock(&cl_com_host_list_mutex);
742
cl_com_cleanup_commlib();
746
pthread_mutex_unlock(&cl_com_host_list_mutex);
748
/* setup global endpoint list */
749
pthread_mutex_lock(&cl_com_endpoint_list_mutex);
750
if (cl_com_endpoint_list == NULL) {
751
ret_val = cl_endpoint_list_setup(&cl_com_endpoint_list, "global_endpoint_list" , 0 , 0, CL_TRUE);
752
if (cl_com_endpoint_list == NULL) {
753
pthread_mutex_unlock(&cl_com_endpoint_list_mutex);
754
cl_com_cleanup_commlib();
758
pthread_mutex_unlock(&cl_com_endpoint_list_mutex);
760
/* setup global parameter list */
761
pthread_mutex_lock(&cl_com_parameter_list_mutex);
762
if (cl_com_parameter_list == NULL) {
763
ret_val = cl_parameter_list_setup(&cl_com_parameter_list, "global_parameter_list");
764
if (cl_com_parameter_list == NULL) {
765
pthread_mutex_unlock(&cl_com_parameter_list_mutex);
766
cl_com_cleanup_commlib();
770
pthread_mutex_unlock(&cl_com_parameter_list_mutex);
772
/* setup global thread list */
773
pthread_mutex_lock(&cl_com_thread_list_mutex);
774
switch(cl_com_create_threads) {
776
CL_LOG(CL_LOG_INFO,"no threads enabled");
780
if (cl_com_thread_list == NULL) {
781
ret_val = cl_thread_list_setup(&cl_com_thread_list,"global_thread_list");
782
if (cl_com_thread_list == NULL) {
783
pthread_mutex_unlock(&cl_com_thread_list_mutex);
784
CL_LOG(CL_LOG_ERROR,"could not setup thread list");
785
cl_com_cleanup_commlib();
788
CL_LOG(CL_LOG_INFO,"starting trigger thread ...");
789
ret_val = cl_thread_list_create_thread(cl_com_thread_list,
792
"trigger_thread", 1, cl_com_trigger_thread, NULL, NULL);
793
if (ret_val != CL_RETVAL_OK) {
794
pthread_mutex_unlock(&cl_com_thread_list_mutex);
795
CL_LOG(CL_LOG_ERROR,"could not start trigger_thread");
796
cl_com_cleanup_commlib();
803
pthread_mutex_unlock(&cl_com_thread_list_mutex);
805
CL_LOG(CL_LOG_INFO,"ngc library setup done");
806
cl_commlib_check_callback_functions();
808
if (different_thread_mode == CL_TRUE) {
809
return CL_RETVAL_COMMLIB_SETUP_ALREADY_CALLED;
816
#ifdef __CL_FUNCTION__
817
#undef __CL_FUNCTION__
819
#define __CL_FUNCTION__ "cl_com_cleanup_commlib()"
820
int cl_com_cleanup_commlib(void) {
821
int ret_val = CL_RETVAL_OK;
822
cl_thread_settings_t* thread_p = NULL;
823
cl_handle_list_elem_t* elem = NULL;
825
/* lock handle list mutex */
826
pthread_mutex_lock(&cl_com_handle_list_mutex);
828
if (cl_com_handle_list == NULL) {
829
pthread_mutex_unlock(&cl_com_handle_list_mutex);
830
/* cleanup already called or cl_com_setup_commlib() was not called */
831
return CL_RETVAL_PARAMS;
834
CL_LOG(CL_LOG_INFO,"cleanup commlib ...");
836
cl_commlib_check_callback_functions(); /* flush all callbacks to application */
838
/* shutdown all connection handle objects (and threads) */
839
while ( (elem = cl_handle_list_get_first_elem(cl_com_handle_list)) != NULL) {
840
cl_commlib_shutdown_handle(elem->handle,CL_FALSE);
843
CL_LOG(CL_LOG_INFO,"cleanup thread list ...");
844
/* cleanup global thread list */
845
pthread_mutex_lock(&cl_com_thread_list_mutex);
846
switch(cl_com_create_threads) {
848
CL_LOG(CL_LOG_INFO,"no threads enabled");
851
CL_LOG(CL_LOG_INFO,"shutdown trigger thread ...");
852
ret_val = cl_thread_list_delete_thread_by_id(cl_com_thread_list, 1);
853
if (ret_val != CL_RETVAL_OK) {
854
CL_LOG_STR(CL_LOG_ERROR,"error shutting down trigger thread", cl_get_error_text(ret_val));
856
CL_LOG(CL_LOG_INFO,"shutdown trigger thread OK");
861
while( (thread_p=cl_thread_list_get_first_thread(cl_com_thread_list)) != NULL) {
862
CL_LOG(CL_LOG_ERROR,"cleanup of threads did not shutdown all threads ...");
863
cl_thread_list_delete_thread(cl_com_thread_list, thread_p);
866
cl_thread_list_cleanup(&cl_com_thread_list);
868
cl_thread_cleanup_global_thread_config_key();
870
pthread_mutex_unlock(&cl_com_thread_list_mutex);
872
CL_LOG(CL_LOG_INFO,"cleanup thread list done");
875
CL_LOG(CL_LOG_INFO,"cleanup handle list ...");
877
/* cleanup global cl_com_handle_list */
878
cl_handle_list_cleanup(&cl_com_handle_list);
879
pthread_mutex_unlock(&cl_com_handle_list_mutex);
882
CL_LOG(CL_LOG_INFO,"cleanup endpoint list ...");
883
pthread_mutex_lock(&cl_com_endpoint_list_mutex);
884
cl_endpoint_list_cleanup(&cl_com_endpoint_list);
885
pthread_mutex_unlock(&cl_com_endpoint_list_mutex);
887
CL_LOG(CL_LOG_INFO,"cleanup host list ...");
888
pthread_mutex_lock(&cl_com_host_list_mutex);
889
cl_host_list_cleanup(&cl_com_host_list);
890
pthread_mutex_unlock(&cl_com_host_list_mutex);
892
CL_LOG(CL_LOG_INFO,"cleanup parameter list ...");
893
pthread_mutex_lock(&cl_com_parameter_list_mutex);
894
cl_parameter_list_cleanup(&cl_com_parameter_list);
895
pthread_mutex_unlock(&cl_com_parameter_list_mutex);
897
CL_LOG(CL_LOG_INFO,"cleanup ssl framework configuration object ...");
898
cl_com_ssl_framework_cleanup();
900
CL_LOG(CL_LOG_INFO,"cleanup application error list ...");
901
pthread_mutex_lock(&cl_com_application_error_list_mutex);
902
cl_application_error_list_cleanup(&cl_com_application_error_list);
903
pthread_mutex_unlock(&cl_com_application_error_list_mutex);
905
CL_LOG(CL_LOG_INFO,"cleanup log list ...");
906
/* cleanup global cl_com_log_list */
907
pthread_mutex_lock(&cl_com_log_list_mutex);
908
if (cl_commlib_debug_resolvable_hosts != NULL) {
909
free(cl_commlib_debug_resolvable_hosts);
910
cl_commlib_debug_resolvable_hosts = NULL;
912
if (cl_commlib_debug_unresolvable_hosts != NULL) {
913
free(cl_commlib_debug_unresolvable_hosts);
914
cl_commlib_debug_unresolvable_hosts = NULL;
916
cl_log_list_cleanup(&cl_com_log_list);
917
pthread_mutex_unlock(&cl_com_log_list_mutex);
924
/*TODO check this function , check locking */
925
#ifdef __CL_FUNCTION__
926
#undef __CL_FUNCTION__
928
#define __CL_FUNCTION__ "cl_commlib_set_connection_param()"
929
int cl_commlib_set_connection_param(cl_com_handle_t* handle, int parameter, int value) {
930
if (handle != NULL) {
932
case HEARD_FROM_TIMEOUT:
933
handle->last_heard_from_timeout = value;
934
/* add a few seconds to actual HEARD_FROM_TIMEOUT, because we want to be sure that
935
qmaster will find the execd when try to send a job at end of HEARD_FROM_TIMEOUT intervall */
936
cl_endpoint_list_set_entry_life_time(cl_com_get_endpoint_list(), value + handle->open_connection_timeout );
942
/*TODO check this function , check locking*/
943
#ifdef __CL_FUNCTION__
944
#undef __CL_FUNCTION__
946
#define __CL_FUNCTION__ "cl_commlib_get_connection_param()"
947
int cl_commlib_get_connection_param(cl_com_handle_t* handle, int parameter, int* value) {
948
if (handle != NULL) {
950
case HEARD_FROM_TIMEOUT:
951
*value = handle->last_heard_from_timeout;
958
#ifdef __CL_FUNCTION__
959
#undef __CL_FUNCTION__
961
#define __CL_FUNCTION__ "cl_commlib_set_global_param()"
962
int cl_commlib_set_global_param(cl_global_settings_params_t parameter, cl_bool_t value) {
963
pthread_mutex_lock(&cl_com_global_settings_mutex);
965
case CL_COMMLIB_DELAYED_LISTEN: {
966
cl_com_global_settings.delayed_listen = value;
970
pthread_mutex_unlock(&cl_com_global_settings_mutex);
974
#ifdef __CL_FUNCTION__
975
#undef __CL_FUNCTION__
977
#define __CL_FUNCTION__ "cl_commlib_get_global_param()"
978
cl_bool_t cl_commlib_get_global_param(cl_global_settings_params_t parameter) {
979
cl_bool_t retval = CL_FALSE;
980
pthread_mutex_lock(&cl_com_global_settings_mutex);
982
case CL_COMMLIB_DELAYED_LISTEN: {
983
retval = cl_com_global_settings.delayed_listen;
987
pthread_mutex_unlock(&cl_com_global_settings_mutex);
992
#ifdef __CL_FUNCTION__
993
#undef __CL_FUNCTION__
995
#define __CL_FUNCTION__ "cl_com_specify_ssl_configuration()"
996
int cl_com_specify_ssl_configuration(cl_ssl_setup_t* new_config) {
997
int ret_val = CL_RETVAL_OK;
999
pthread_mutex_lock(&cl_com_ssl_setup_mutex);
1000
if (cl_com_ssl_setup_config != NULL) {
1001
CL_LOG(CL_LOG_INFO,"resetting ssl setup configuration");
1002
cl_com_free_ssl_setup(&cl_com_ssl_setup_config);
1004
CL_LOG(CL_LOG_INFO,"setting ssl setup configuration");
1006
ret_val = cl_com_dup_ssl_setup(&cl_com_ssl_setup_config, new_config);
1007
if (ret_val != CL_RETVAL_OK) {
1008
CL_LOG_STR(CL_LOG_WARNING, "Cannot set ssl setup configuration! Reason:", cl_get_error_text(ret_val));
1010
pthread_mutex_unlock(&cl_com_ssl_setup_mutex);
1015
#ifdef __CL_FUNCTION__
1016
#undef __CL_FUNCTION__
1018
#define __CL_FUNCTION__ "cl_com_create_handle()"
1019
cl_com_handle_t* cl_com_create_handle(int* commlib_error,
1020
cl_framework_t framework,
1021
cl_xml_connection_type_t data_flow_type,
1022
cl_bool_t service_provider,
1024
cl_tcp_connect_t tcp_connect_mode,
1025
char* component_name, unsigned long component_id,
1026
int select_sec_timeout, int select_usec_timeout) {
1027
int thread_start_error = 0;
1028
cl_com_handle_t* new_handle = NULL;
1029
int return_value = CL_RETVAL_OK;
1031
int full_usec_seconds = 0;
1033
char help_buffer[80];
1034
char* local_hostname = NULL;
1035
struct in_addr local_addr;
1036
cl_handle_list_elem_t* elem = NULL;
1038
struct rlimit64 application_rlimits;
1040
struct rlimit application_rlimits;
1043
cl_commlib_check_callback_functions();
1045
if (cl_com_handle_list == NULL) {
1046
CL_LOG(CL_LOG_ERROR,"cl_com_setup_commlib() not called");
1047
cl_commlib_push_application_error(CL_LOG_ERROR, CL_RETVAL_NO_FRAMEWORK_INIT, NULL);
1048
if (commlib_error) {
1049
*commlib_error = CL_RETVAL_NO_FRAMEWORK_INIT;
1054
if (component_name == NULL ) {
1055
CL_LOG(CL_LOG_ERROR,"component name is NULL");
1056
cl_commlib_push_application_error(CL_LOG_ERROR, CL_RETVAL_PARAMS, NULL);
1057
if (commlib_error) {
1058
*commlib_error = CL_RETVAL_PARAMS;
1063
if ( service_provider == CL_TRUE && component_id == 0) {
1064
CL_LOG(CL_LOG_ERROR,"service can't use component id 0");
1065
cl_commlib_push_application_error(CL_LOG_ERROR, CL_RETVAL_PARAMS, NULL);
1066
if (commlib_error) {
1067
*commlib_error = CL_RETVAL_PARAMS;
1072
/* first lock handle list */
1073
cl_raw_list_lock(cl_com_handle_list);
1075
elem = cl_handle_list_get_first_elem(cl_com_handle_list);
1076
while ( elem != NULL) {
1077
cl_com_endpoint_t* local_endpoint = elem->handle->local;
1079
if (local_endpoint->comp_id == component_id) {
1080
if (strcmp(local_endpoint->comp_name, component_name) == 0) {
1081
/* we have this handle already in list */
1082
CL_LOG(CL_LOG_ERROR,"component not unique");
1083
cl_commlib_push_application_error(CL_LOG_ERROR, CL_RETVAL_LOCAL_ENDPOINT_NOT_UNIQUE, NULL);
1084
cl_raw_list_unlock(cl_com_handle_list);
1085
if (commlib_error) {
1086
*commlib_error = CL_RETVAL_LOCAL_ENDPOINT_NOT_UNIQUE;
1091
elem = cl_handle_list_get_next_elem(elem);
1094
return_value = cl_com_gethostname(&local_hostname, &local_addr, NULL, NULL);
1095
if (return_value != CL_RETVAL_OK) {
1096
CL_LOG(CL_LOG_ERROR,cl_get_error_text(return_value));
1097
cl_commlib_push_application_error(CL_LOG_ERROR, return_value, NULL);
1098
cl_raw_list_unlock(cl_com_handle_list);
1099
if (commlib_error) {
1100
*commlib_error = return_value;
1104
CL_LOG_STR(CL_LOG_INFO,"local host name is",local_hostname );
1107
new_handle = (cl_com_handle_t*) malloc(sizeof(cl_com_handle_t));
1108
if (new_handle == NULL) {
1109
free(local_hostname);
1110
CL_LOG(CL_LOG_ERROR,"malloc() error");
1111
cl_commlib_push_application_error(CL_LOG_ERROR, CL_RETVAL_MALLOC, NULL);
1112
cl_raw_list_unlock(cl_com_handle_list);
1113
if (commlib_error) {
1114
*commlib_error = CL_RETVAL_MALLOC;
1119
/* setup SSL configuration */
1120
new_handle->ssl_setup = NULL;
1122
case CL_CT_UNDEFINED:
1126
pthread_mutex_lock(&cl_com_ssl_setup_mutex);
1127
if (cl_com_ssl_setup_config == NULL) {
1128
CL_LOG(CL_LOG_ERROR,"use cl_com_specify_ssl_configuration() to specify a ssl configuration");
1129
free(local_hostname);
1131
cl_raw_list_unlock(cl_com_handle_list);
1132
if (commlib_error) {
1133
*commlib_error = CL_RETVAL_NO_FRAMEWORK_INIT;
1135
pthread_mutex_unlock(&cl_com_ssl_setup_mutex);
1136
cl_commlib_push_application_error(CL_LOG_ERROR, CL_RETVAL_NO_FRAMEWORK_INIT, NULL);
1140
if ((return_value = cl_com_dup_ssl_setup(&(new_handle->ssl_setup), cl_com_ssl_setup_config)) != CL_RETVAL_OK) {
1141
free(local_hostname);
1143
cl_raw_list_unlock(cl_com_handle_list);
1144
if (commlib_error) {
1145
*commlib_error = return_value;
1147
pthread_mutex_unlock(&cl_com_ssl_setup_mutex);
1148
cl_commlib_push_application_error(CL_LOG_ERROR, return_value, NULL);
1152
pthread_mutex_unlock(&cl_com_ssl_setup_mutex);
1157
usec_rest = select_usec_timeout % 1000000; /* usec parameter for select should not be > 1000000 !!!*/
1158
full_usec_seconds = select_usec_timeout / 1000000; /* full seconds from timeout_val_usec parameter */
1159
sec_param = select_sec_timeout + full_usec_seconds; /* add full seconds from usec parameter to timeout_val_sec parameter */
1163
new_handle->local = NULL;
1164
new_handle->tcp_connect_mode = tcp_connect_mode;
1166
new_handle->debug_client_setup = NULL;
1167
if ((return_value = cl_com_create_debug_client_setup(&(new_handle->debug_client_setup),CL_DEBUG_CLIENT_OFF, CL_TRUE, 0)) != CL_RETVAL_OK) {
1168
CL_LOG(CL_LOG_ERROR,"can't setup debug client structure");
1169
free(local_hostname);
1171
cl_raw_list_unlock(cl_com_handle_list);
1172
if (commlib_error) {
1173
*commlib_error = CL_RETVAL_NO_FRAMEWORK_INIT;
1175
cl_commlib_push_application_error(CL_LOG_ERROR, CL_RETVAL_NO_FRAMEWORK_INIT, NULL);
1179
new_handle->messages_ready_for_read = 0;
1180
new_handle->messages_ready_mutex = NULL;
1181
new_handle->connection_list_mutex = NULL;
1182
new_handle->connection_list = NULL;
1183
new_handle->received_message_queue = NULL;
1184
new_handle->send_message_queue = NULL;
1185
new_handle->next_free_client_id = 1;
1186
new_handle->service_handler = NULL;
1187
new_handle->framework = framework;
1188
new_handle->data_flow_type = data_flow_type;
1189
new_handle->service_provider = service_provider;
1190
if ( new_handle->service_provider == CL_TRUE) {
1191
/* we are service provider */
1192
new_handle->connect_port = 0;
1193
if ( handle_port == 0 ) {
1194
new_handle->service_port = 0;
1195
CL_LOG(CL_LOG_WARNING,"no port specified, using next free port");
1197
new_handle->service_port = handle_port;
1200
/* we are client, use port for connect */
1201
new_handle->connect_port = handle_port;
1202
new_handle->service_port = 0;
1204
new_handle->connection_timeout = CL_DEFINE_CLIENT_CONNECTION_LIFETIME;
1205
new_handle->last_heard_from_timeout = CL_DEFINE_CLIENT_CONNECTION_LIFETIME; /* don't use should be removed */
1206
new_handle->read_timeout = CL_DEFINE_READ_TIMEOUT;
1207
new_handle->write_timeout = CL_DEFINE_WRITE_TIMEOUT;
1208
new_handle->open_connection_timeout = CL_DEFINE_GET_CLIENT_CONNECTION_DATA_TIMEOUT;
1209
new_handle->close_connection_timeout = CL_DEFINE_DELETE_MESSAGES_TIMEOUT_AFTER_CCRM;
1210
new_handle->acknowledge_timeout = CL_DEFINE_ACK_TIMEOUT;
1211
new_handle->message_timeout = CL_DEFINE_MESSAGE_TIMEOUT;
1212
new_handle->select_sec_timeout = sec_param;
1213
new_handle->select_usec_timeout = usec_rest;
1214
new_handle->synchron_receive_timeout = CL_DEFINE_SYNCHRON_RECEIVE_TIMEOUT; /* default from old commlib */
1215
new_handle->do_shutdown = 0; /* no shutdown till now */
1216
new_handle->max_connection_count_reached = CL_FALSE;
1217
new_handle->max_connection_count_found_connection_to_close = CL_FALSE;
1218
new_handle->allowed_host_list = NULL;
1220
new_handle->auto_close_mode = CL_CM_AC_DISABLED;
1223
getrlimit64(RLIMIT_NOFILE, &application_rlimits);
1225
getrlimit(RLIMIT_NOFILE, &application_rlimits);
1228
new_handle->max_open_connections = (unsigned long) application_rlimits.rlim_cur;
1230
if (FD_SETSIZE < new_handle->max_open_connections) {
1231
CL_LOG(CL_LOG_ERROR,"FD_SETSIZE < file descriptor limit");
1232
new_handle->max_open_connections = FD_SETSIZE - 1;
1235
if ( new_handle->max_open_connections < 32 ) {
1236
CL_LOG_INT(CL_LOG_ERROR, "to less file descriptors:", (int)new_handle->max_open_connections );
1238
free(local_hostname);
1239
cl_raw_list_unlock(cl_com_handle_list);
1240
if (commlib_error) {
1241
*commlib_error = CL_RETVAL_TO_LESS_FILEDESCRIPTORS;
1243
cl_commlib_push_application_error(CL_LOG_ERROR, CL_RETVAL_TO_LESS_FILEDESCRIPTORS, NULL);
1246
CL_LOG_INT(CL_LOG_INFO, "max file descriptors on this system :", (int)new_handle->max_open_connections);
1248
new_handle->max_open_connections = new_handle->max_open_connections - 20;
1250
CL_LOG_INT(CL_LOG_INFO, "max. used descriptors for communication:", (int)new_handle->max_open_connections);
1252
new_handle->max_con_close_mode = CL_ON_MAX_COUNT_OFF;
1253
new_handle->app_condition = NULL;
1254
new_handle->read_condition = NULL;
1255
new_handle->write_condition = NULL;
1256
new_handle->service_thread = NULL;
1257
new_handle->read_thread = NULL;
1258
new_handle->write_thread = NULL;
1261
new_handle->statistic = (cl_com_handle_statistic_t*)malloc(sizeof(cl_com_handle_statistic_t));
1262
if (new_handle->statistic == NULL) {
1264
free(local_hostname);
1265
cl_raw_list_unlock(cl_com_handle_list);
1266
if (commlib_error) {
1267
*commlib_error = CL_RETVAL_MALLOC;
1269
cl_commlib_push_application_error(CL_LOG_ERROR, CL_RETVAL_MALLOC, NULL);
1272
memset(new_handle->statistic, 0, sizeof(cl_com_handle_statistic_t));
1274
gettimeofday(&(new_handle->statistic->last_update),NULL);
1275
gettimeofday(&(new_handle->start_time),NULL);
1277
new_handle->messages_ready_mutex = (pthread_mutex_t*)malloc(sizeof(pthread_mutex_t));
1278
if (new_handle->messages_ready_mutex == NULL) {
1279
cl_com_free_handle_statistic(&(new_handle->statistic));
1281
free(local_hostname);
1282
cl_raw_list_unlock(cl_com_handle_list);
1283
if (commlib_error) {
1284
*commlib_error = CL_RETVAL_MALLOC;
1286
cl_commlib_push_application_error(CL_LOG_ERROR, CL_RETVAL_MALLOC, NULL);
1290
new_handle->connection_list_mutex = (pthread_mutex_t*)malloc(sizeof(pthread_mutex_t));
1291
if (new_handle->connection_list_mutex == NULL) {
1292
free(new_handle->messages_ready_mutex);
1293
cl_com_free_handle_statistic(&(new_handle->statistic));
1295
free(local_hostname);
1296
cl_raw_list_unlock(cl_com_handle_list);
1297
if (commlib_error) {
1298
*commlib_error = CL_RETVAL_MALLOC;
1300
cl_commlib_push_application_error(CL_LOG_ERROR, CL_RETVAL_MALLOC, NULL);
1305
if (pthread_mutex_init(new_handle->messages_ready_mutex, NULL) != 0) {
1306
int mutex_ret_val = pthread_mutex_destroy(new_handle->messages_ready_mutex);
1307
if (mutex_ret_val != EBUSY) {
1308
free(new_handle->messages_ready_mutex);
1310
free(new_handle->connection_list_mutex);
1311
cl_com_free_handle_statistic(&(new_handle->statistic));
1313
free(local_hostname);
1314
cl_raw_list_unlock(cl_com_handle_list);
1315
if (commlib_error) {
1316
*commlib_error = CL_RETVAL_MUTEX_ERROR;
1318
cl_commlib_push_application_error(CL_LOG_ERROR, CL_RETVAL_MUTEX_ERROR, NULL);
1322
if (pthread_mutex_init(new_handle->connection_list_mutex, NULL) != 0) {
1324
mutex_ret_val = pthread_mutex_destroy(new_handle->connection_list_mutex);
1325
if (mutex_ret_val != EBUSY) {
1326
free(new_handle->connection_list_mutex);
1328
mutex_ret_val = pthread_mutex_destroy(new_handle->messages_ready_mutex);
1329
if (mutex_ret_val != EBUSY) {
1330
free(new_handle->messages_ready_mutex);
1332
cl_com_free_handle_statistic(&(new_handle->statistic));
1334
free(local_hostname);
1335
cl_raw_list_unlock(cl_com_handle_list);
1336
if (commlib_error) {
1337
*commlib_error = CL_RETVAL_MUTEX_ERROR;
1339
cl_commlib_push_application_error(CL_LOG_ERROR, CL_RETVAL_MUTEX_ERROR, NULL);
1343
new_handle->local = cl_com_create_endpoint(local_hostname, component_name, component_id, &local_addr);
1344
if (new_handle->local == NULL) {
1346
mutex_ret_val = pthread_mutex_destroy(new_handle->connection_list_mutex);
1347
if (mutex_ret_val != EBUSY) {
1348
free(new_handle->connection_list_mutex);
1350
mutex_ret_val = pthread_mutex_destroy(new_handle->messages_ready_mutex);
1351
if (mutex_ret_val != EBUSY) {
1352
free(new_handle->messages_ready_mutex);
1354
cl_com_free_handle_statistic(&(new_handle->statistic));
1356
free(local_hostname);
1357
cl_raw_list_unlock(cl_com_handle_list);
1358
if (commlib_error) {
1359
*commlib_error = CL_RETVAL_MALLOC;
1361
cl_commlib_push_application_error(CL_LOG_ERROR, CL_RETVAL_MALLOC, NULL);
1365
if ((return_value=cl_app_message_queue_setup(&(new_handle->send_message_queue), "send message queue", 1)) != CL_RETVAL_OK) {
1367
cl_app_message_queue_cleanup(&(new_handle->received_message_queue));
1368
cl_app_message_queue_cleanup(&(new_handle->send_message_queue));
1369
cl_com_free_endpoint(&(new_handle->local));
1370
mutex_ret_val = pthread_mutex_destroy(new_handle->connection_list_mutex);
1371
if (mutex_ret_val != EBUSY) {
1372
free(new_handle->connection_list_mutex);
1374
mutex_ret_val = pthread_mutex_destroy(new_handle->messages_ready_mutex);
1375
if (mutex_ret_val != EBUSY) {
1376
free(new_handle->messages_ready_mutex);
1378
cl_com_free_handle_statistic(&(new_handle->statistic));
1380
free(local_hostname);
1381
cl_raw_list_unlock(cl_com_handle_list);
1382
if (commlib_error) {
1383
*commlib_error = return_value;
1385
cl_commlib_push_application_error(CL_LOG_ERROR, return_value, NULL);
1389
if ((return_value=cl_app_message_queue_setup(&(new_handle->received_message_queue), "received message queue", 1)) != CL_RETVAL_OK) {
1391
cl_app_message_queue_cleanup(&(new_handle->received_message_queue));
1392
cl_com_free_endpoint(&(new_handle->local));
1393
mutex_ret_val = pthread_mutex_destroy(new_handle->connection_list_mutex);
1394
if (mutex_ret_val != EBUSY) {
1395
free(new_handle->connection_list_mutex);
1397
mutex_ret_val = pthread_mutex_destroy(new_handle->messages_ready_mutex);
1398
if (mutex_ret_val != EBUSY) {
1399
free(new_handle->messages_ready_mutex);
1401
cl_com_free_handle_statistic(&(new_handle->statistic));
1403
free(local_hostname);
1404
cl_raw_list_unlock(cl_com_handle_list);
1405
if (commlib_error) {
1406
*commlib_error = return_value;
1408
cl_commlib_push_application_error(CL_LOG_ERROR, return_value, NULL);
1412
if ((return_value=cl_connection_list_setup(&(new_handle->connection_list), "connection list", 1, CL_TRUE)) != CL_RETVAL_OK) {
1414
cl_app_message_queue_cleanup(&(new_handle->send_message_queue));
1415
cl_app_message_queue_cleanup(&(new_handle->received_message_queue));
1416
cl_connection_list_cleanup(&(new_handle->connection_list));
1417
cl_com_free_endpoint(&(new_handle->local));
1418
mutex_ret_val = pthread_mutex_destroy(new_handle->connection_list_mutex);
1419
if (mutex_ret_val != EBUSY) {
1420
free(new_handle->connection_list_mutex);
1422
mutex_ret_val = pthread_mutex_destroy(new_handle->messages_ready_mutex);
1423
if (mutex_ret_val != EBUSY) {
1424
free(new_handle->messages_ready_mutex);
1426
cl_com_free_handle_statistic(&(new_handle->statistic));
1428
free(local_hostname);
1429
cl_raw_list_unlock(cl_com_handle_list);
1430
if (commlib_error) {
1431
*commlib_error = return_value;
1433
cl_commlib_push_application_error(CL_LOG_ERROR, return_value, NULL);
1437
if ((return_value=cl_string_list_setup(&(new_handle->allowed_host_list), "allowed host list")) != CL_RETVAL_OK) {
1440
cl_string_list_cleanup(&(new_handle->allowed_host_list));
1441
cl_app_message_queue_cleanup(&(new_handle->send_message_queue));
1442
cl_app_message_queue_cleanup(&(new_handle->received_message_queue));
1443
cl_connection_list_cleanup(&(new_handle->connection_list));
1444
cl_com_free_endpoint(&(new_handle->local));
1445
mutex_ret_val = pthread_mutex_destroy(new_handle->connection_list_mutex);
1446
if (mutex_ret_val != EBUSY) {
1447
free(new_handle->connection_list_mutex);
1449
mutex_ret_val = pthread_mutex_destroy(new_handle->messages_ready_mutex);
1450
if (mutex_ret_val != EBUSY) {
1451
free(new_handle->messages_ready_mutex);
1453
cl_com_free_handle_statistic(&(new_handle->statistic));
1455
free(local_hostname);
1456
cl_raw_list_unlock(cl_com_handle_list);
1457
if (commlib_error) {
1458
*commlib_error = return_value;
1460
cl_commlib_push_application_error(CL_LOG_ERROR, return_value, NULL);
1464
/* local host name not needed anymore */
1465
free(local_hostname);
1466
local_hostname = NULL;
1468
if (new_handle->service_provider == CL_TRUE) {
1469
/* create service */
1470
cl_com_connection_t* new_con = NULL;
1472
CL_LOG(CL_LOG_INFO,"creating service ...");
1473
return_value = cl_com_setup_connection(new_handle, &new_con);
1475
if (return_value != CL_RETVAL_OK) {
1477
cl_com_close_connection(&new_con);
1478
cl_string_list_cleanup(&(new_handle->allowed_host_list));
1479
cl_app_message_queue_cleanup(&(new_handle->send_message_queue));
1480
cl_app_message_queue_cleanup(&(new_handle->received_message_queue));
1481
cl_connection_list_cleanup(&(new_handle->connection_list));
1482
cl_com_free_endpoint(&(new_handle->local));
1483
mutex_ret_val = pthread_mutex_destroy(new_handle->connection_list_mutex);
1484
if (mutex_ret_val != EBUSY) {
1485
free(new_handle->connection_list_mutex);
1487
mutex_ret_val = pthread_mutex_destroy(new_handle->messages_ready_mutex);
1488
if (mutex_ret_val != EBUSY) {
1489
free(new_handle->messages_ready_mutex);
1491
cl_com_free_handle_statistic(&(new_handle->statistic));
1493
cl_raw_list_unlock(cl_com_handle_list);
1494
if (commlib_error) {
1495
*commlib_error = return_value;
1497
cl_commlib_push_application_error(CL_LOG_ERROR, return_value, NULL);
1501
/* autoclose is not used for service connection */
1502
new_con->data_flow_type = CL_CM_CT_STREAM;
1503
new_con->auto_close_type = CL_CM_AC_UNDEFINED;
1504
new_con->handler = new_handle;
1506
return_value = cl_com_connection_request_handler_setup(new_con, new_handle->local);
1507
if (return_value != CL_RETVAL_OK) {
1509
cl_com_connection_request_handler_cleanup(new_con);
1510
cl_com_close_connection(&new_con);
1511
cl_string_list_cleanup(&(new_handle->allowed_host_list));
1512
cl_app_message_queue_cleanup(&(new_handle->send_message_queue));
1513
cl_app_message_queue_cleanup(&(new_handle->received_message_queue));
1514
cl_connection_list_cleanup(&(new_handle->connection_list));
1515
cl_com_free_endpoint(&(new_handle->local));
1516
mutex_ret_val = pthread_mutex_destroy(new_handle->connection_list_mutex);
1517
if (mutex_ret_val != EBUSY) {
1518
free(new_handle->connection_list_mutex);
1520
mutex_ret_val = pthread_mutex_destroy(new_handle->messages_ready_mutex);
1521
if (mutex_ret_val != EBUSY) {
1522
free(new_handle->messages_ready_mutex);
1524
cl_com_free_handle_statistic(&(new_handle->statistic));
1526
cl_com_free_debug_client_setup(&(new_handle->debug_client_setup));
1528
cl_com_free_ssl_setup(&(new_handle->ssl_setup));
1531
cl_raw_list_unlock(cl_com_handle_list);
1532
if (commlib_error) {
1533
*commlib_error = return_value;
1535
cl_commlib_push_application_error(CL_LOG_ERROR, return_value, NULL);
1539
new_handle->service_handler = new_con;
1542
/* Set handle service port, when we use random port */
1543
if (new_handle->service_port == 0) {
1544
int service_port = 0;
1545
if (cl_com_connection_get_service_port(new_con,&service_port) == CL_RETVAL_OK) {
1546
new_handle->service_port = service_port;
1551
/* create handle thread */
1552
thread_start_error = 0;
1553
switch(cl_com_create_threads) {
1554
case CL_NO_THREAD: {
1555
CL_LOG(CL_LOG_INFO,"no threads enabled");
1558
case CL_RW_THREAD: {
1559
CL_LOG(CL_LOG_INFO,"creating read condition ...");
1560
return_value = cl_thread_create_thread_condition(&(new_handle->read_condition));
1561
if (return_value != CL_RETVAL_OK) {
1562
CL_LOG(CL_LOG_ERROR,"could not setup read condition");
1563
thread_start_error = 1;
1567
CL_LOG(CL_LOG_INFO,"creating application read condition ...");
1568
return_value = cl_thread_create_thread_condition(&(new_handle->app_condition));
1569
if (return_value != CL_RETVAL_OK) {
1570
CL_LOG(CL_LOG_ERROR,"could not setup application read condition");
1571
thread_start_error = 1;
1575
CL_LOG(CL_LOG_INFO,"creating write condition ...");
1576
return_value = cl_thread_create_thread_condition(&(new_handle->write_condition));
1577
if (return_value != CL_RETVAL_OK) {
1578
CL_LOG(CL_LOG_ERROR,"could not setup write condition");
1579
thread_start_error = 1;
1584
CL_LOG(CL_LOG_INFO,"starting handle service thread ...");
1585
snprintf(help_buffer,80,"%s_service",new_handle->local->comp_name);
1586
return_value = cl_thread_list_create_thread(cl_com_thread_list,
1587
&(new_handle->service_thread),
1589
help_buffer, 2, cl_com_handle_service_thread, NULL, (void*)new_handle);
1590
if (return_value != CL_RETVAL_OK) {
1591
CL_LOG(CL_LOG_ERROR,"could not start handle service thread");
1592
thread_start_error = 1;
1596
CL_LOG(CL_LOG_INFO,"starting handle read thread ...");
1597
snprintf(help_buffer,80,"%s_read",new_handle->local->comp_name);
1598
return_value = cl_thread_list_create_thread(cl_com_thread_list,
1599
&(new_handle->read_thread),
1601
help_buffer, 3, cl_com_handle_read_thread, NULL, NULL);
1602
if (return_value != CL_RETVAL_OK) {
1603
CL_LOG(CL_LOG_ERROR,"could not start handle read thread");
1604
thread_start_error = 1;
1608
CL_LOG(CL_LOG_INFO,"starting handle write thread ...");
1609
snprintf(help_buffer,80,"%s_write",new_handle->local->comp_name);
1610
return_value = cl_thread_list_create_thread(cl_com_thread_list,
1611
&(new_handle->write_thread),
1613
help_buffer, 2, cl_com_handle_write_thread, NULL, NULL);
1614
if (return_value != CL_RETVAL_OK) {
1615
CL_LOG(CL_LOG_ERROR,"could not start handle write thread");
1616
thread_start_error = 1;
1623
* Do NOT touch return_value, it contains information about
1624
* thread start up errors !
1626
if (thread_start_error != 0) {
1628
if (new_handle->service_handler != NULL) {
1629
cl_com_connection_request_handler_cleanup(new_handle->service_handler);
1630
cl_com_close_connection(&(new_handle->service_handler));
1632
cl_connection_list_cleanup(&(new_handle->connection_list));
1633
cl_app_message_queue_cleanup(&(new_handle->send_message_queue));
1634
cl_app_message_queue_cleanup(&(new_handle->received_message_queue));
1635
cl_string_list_cleanup(&(new_handle->allowed_host_list));
1636
cl_com_free_endpoint(&(new_handle->local));
1637
mutex_ret_val = pthread_mutex_destroy(new_handle->connection_list_mutex);
1638
if (mutex_ret_val != EBUSY) {
1639
free(new_handle->connection_list_mutex);
1641
mutex_ret_val = pthread_mutex_destroy(new_handle->messages_ready_mutex);
1642
if (mutex_ret_val != EBUSY) {
1643
free(new_handle->messages_ready_mutex);
1645
cl_com_free_handle_statistic(&(new_handle->statistic));
1647
cl_raw_list_unlock(cl_com_handle_list);
1648
if (commlib_error) {
1649
*commlib_error = return_value;
1651
cl_commlib_push_application_error(CL_LOG_ERROR, return_value, NULL);
1654
cl_handle_list_append_handle(cl_com_handle_list, new_handle,0);
1655
cl_raw_list_unlock(cl_com_handle_list);
1656
CL_LOG(CL_LOG_INFO, "new handle created");
1657
if (commlib_error) {
1658
*commlib_error = CL_RETVAL_OK;
1663
#ifdef __CL_FUNCTION__
1664
#undef __CL_FUNCTION__
1666
#define __CL_FUNCTION__ "cl_commlib_shutdown_handle()"
1667
int cl_commlib_shutdown_handle(cl_com_handle_t* handle, cl_bool_t return_for_messages) {
1668
cl_connection_list_elem_t* elem = NULL;
1669
cl_thread_settings_t* thread_settings = NULL;
1671
cl_bool_t connection_list_empty = CL_FALSE;
1672
cl_bool_t trigger_write = CL_FALSE;
1673
cl_app_message_queue_elem_t* mq_elem = NULL;
1674
int mq_return_value = CL_RETVAL_OK;
1678
cl_commlib_check_callback_functions();
1680
if (handle == NULL) {
1681
return CL_RETVAL_PARAMS;
1684
if (cl_com_handle_list == NULL) {
1685
CL_LOG(CL_LOG_ERROR,"cl_com_setup_commlib() not called");
1686
return CL_RETVAL_PARAMS;
1689
cl_raw_list_lock(cl_com_handle_list);
1691
CL_LOG(CL_LOG_INFO,"shutting down handle ...");
1692
if ( handle->do_shutdown == 0) {
1693
/* wait for connection close response message from heach connection */
1694
/* get current timeout time */
1695
gettimeofday(&now,NULL);
1696
handle->shutdown_timeout = now.tv_sec + handle->acknowledge_timeout + handle->close_connection_timeout;
1699
/* flush send message queue */
1700
cl_raw_list_lock(handle->send_message_queue);
1701
while((mq_elem = cl_app_message_queue_get_first_elem(handle->send_message_queue)) != NULL) {
1702
CL_LOG(CL_LOG_INFO,"flushing send message queue ...");
1704
mq_return_value = cl_commlib_send_message_to_endpoint(handle, mq_elem->snd_destination,
1705
mq_elem->snd_ack_type, mq_elem->snd_data,
1706
mq_elem->snd_size, mq_elem->snd_response_mid,
1708
/* remove queue entries */
1709
cl_raw_list_remove_elem(handle->send_message_queue, mq_elem->raw_elem);
1710
if (mq_return_value != CL_RETVAL_OK) {
1711
CL_LOG_STR(CL_LOG_ERROR,"can't send message:", cl_get_error_text(mq_return_value));
1712
free(mq_elem->snd_data);
1714
cl_com_free_endpoint(&(mq_elem->snd_destination));
1717
cl_raw_list_unlock(handle->send_message_queue);
1719
if (return_for_messages == CL_TRUE) {
1720
handle->do_shutdown = 1; /* stop accepting new connections , don't delete any messages */
1722
handle->do_shutdown = 2; /* stop accepting new connections , return not handled response messages in cl_commlib_receive_message() */
1725
/* wait for empty connection list */
1726
while( connection_list_empty == CL_FALSE) {
1727
cl_bool_t have_message_connections = CL_FALSE;
1728
cl_bool_t ignore_timeout = cl_com_get_ignore_timeouts_flag();
1729
connection_list_empty = CL_TRUE;
1730
trigger_write = CL_FALSE;
1732
cl_raw_list_lock(handle->connection_list);
1733
elem = cl_connection_list_get_first_elem(handle->connection_list);
1735
connection_list_empty = CL_FALSE;
1736
if (elem->connection->data_flow_type == CL_CM_CT_MESSAGE) {
1737
have_message_connections = CL_TRUE;
1738
if ( elem->connection->connection_state == CL_CONNECTED &&
1739
elem->connection->connection_sub_state == CL_COM_WORK &&
1740
elem->connection->ccm_received == 0) {
1741
cl_commlib_send_ccm_message(elem->connection);
1742
trigger_write = CL_TRUE;
1743
elem->connection->connection_sub_state = CL_COM_SENDING_CCM;
1745
CL_LOG_STR(CL_LOG_INFO,"wait for connection removal, current state is",
1746
cl_com_get_connection_state(elem->connection));
1747
CL_LOG_STR(CL_LOG_INFO,"wait for connection removal, current sub state is",
1748
cl_com_get_connection_sub_state(elem->connection));
1749
if ( ignore_timeout == CL_TRUE ) {
1750
switch( elem->connection->connection_state) {
1754
case CL_CONNECTED: {
1755
CL_LOG(CL_LOG_INFO,"we are connected, don't ignore timeouts");
1756
ignore_timeout = CL_FALSE;
1760
case CL_DISCONNECTED:
1767
elem = cl_connection_list_get_next_elem(elem);
1769
cl_raw_list_unlock(handle->connection_list);
1773
* If there are still connections, trigger read/write of messages and
1774
* return for messages (if return_for_messages is set)
1776
if ( connection_list_empty == CL_FALSE) {
1777
int return_value = CL_RETVAL_OK;
1778
/* still waiting for messages */
1779
switch(cl_com_create_threads) {
1780
case CL_NO_THREAD: {
1781
pthread_mutex_lock(handle->messages_ready_mutex);
1782
if (handle->messages_ready_for_read != 0) {
1783
pthread_mutex_unlock(handle->messages_ready_mutex);
1784
/* return for messages */
1785
if (return_for_messages == CL_TRUE) {
1786
cl_raw_list_unlock(cl_com_handle_list);
1787
CL_LOG(CL_LOG_INFO,"delivering MESSAGES");
1788
return CL_RETVAL_MESSAGE_IN_BUFFER;
1790
/* delete messages */
1791
cl_com_message_t* message = NULL;
1792
cl_com_endpoint_t* sender = NULL;
1794
* cl_commlib_receive_message() can be called inside of cl_commlib_shutdown_handle()
1795
* because cl_commlib_shutdown_handle() is called from application context.
1798
cl_commlib_receive_message(handle,NULL, NULL, 0, CL_FALSE, 0, &message, &sender);
1799
if (message != NULL) {
1800
CL_LOG(CL_LOG_WARNING,"deleting message");
1801
cl_com_free_message(&message);
1803
if (sender != NULL) {
1804
CL_LOG_STR(CL_LOG_WARNING,"deleted message from:", sender->comp_host);
1805
cl_com_free_endpoint(&sender);
1810
pthread_mutex_unlock(handle->messages_ready_mutex);
1812
cl_commlib_trigger(handle, 1);
1815
case CL_RW_THREAD: {
1816
if (trigger_write == CL_TRUE) {
1817
cl_thread_trigger_event(handle->write_thread);
1821
pthread_mutex_lock(handle->messages_ready_mutex);
1822
if (handle->messages_ready_for_read != 0) {
1823
pthread_mutex_unlock(handle->messages_ready_mutex);
1825
if ( return_for_messages == CL_TRUE) {
1826
/* return for messages */
1827
cl_raw_list_unlock(cl_com_handle_list);
1828
CL_LOG(CL_LOG_ERROR,"delivering MESSAGES");
1829
return CL_RETVAL_MESSAGE_IN_BUFFER;
1831
/* delete messages */
1832
cl_com_message_t* message = NULL;
1833
cl_com_endpoint_t* sender = NULL;
1834
cl_commlib_receive_message(handle,NULL, NULL, 0, CL_FALSE, 0, &message, &sender);
1835
if (message != NULL) {
1836
CL_LOG(CL_LOG_WARNING,"deleting message");
1837
cl_com_free_message(&message);
1839
if (sender != NULL) {
1840
CL_LOG_STR(CL_LOG_WARNING,"deleted message from:", sender->comp_host);
1841
cl_com_free_endpoint(&sender);
1846
pthread_mutex_unlock(handle->messages_ready_mutex);
1849
CL_LOG(CL_LOG_INFO,"APPLICATION WAITING for CCRM");
1850
return_value = cl_thread_wait_for_thread_condition(handle->read_condition,
1851
handle->select_sec_timeout,
1852
handle->select_usec_timeout);
1853
if (return_value == CL_RETVAL_CONDITION_WAIT_TIMEOUT) {
1854
CL_LOG(CL_LOG_WARNING,"APPLICATION GOT CONDITION WAIT TIMEOUT WHILE WAITING FOR CCRM");
1860
/* shutdown stream connections when there are no more message connections */
1861
cl_raw_list_lock(handle->connection_list);
1862
elem = cl_connection_list_get_first_elem(handle->connection_list);
1864
connection_list_empty = CL_FALSE;
1865
if ( elem->connection->data_flow_type == CL_CM_CT_STREAM) {
1866
if ( have_message_connections == CL_FALSE ) {
1867
/* close stream connections after message connections */
1869
if (handle->debug_client_setup->dc_mode != CL_DEBUG_CLIENT_OFF) {
1870
/* don't close STREAM connection when there are debug clients
1871
and debug data list is not empty */
1873
cl_raw_list_lock(handle->debug_client_setup->dc_debug_list);
1874
if (cl_raw_list_get_elem_count(handle->debug_client_setup->dc_debug_list) == 0) {
1875
elem->connection->connection_state = CL_CLOSING;
1876
elem->connection->connection_sub_state = CL_COM_DO_SHUTDOWN;
1878
cl_raw_list_unlock(handle->debug_client_setup->dc_debug_list);
1880
/* debug clients are not connected, just close stream connections */
1881
elem->connection->connection_state = CL_CLOSING;
1882
elem->connection->connection_sub_state = CL_COM_DO_SHUTDOWN;
1886
elem = cl_connection_list_get_next_elem(elem);
1888
cl_raw_list_unlock(handle->connection_list);
1890
cl_commlib_handle_debug_clients(handle, CL_TRUE);
1896
gettimeofday(&now,NULL);
1897
if (handle->shutdown_timeout <= now.tv_sec || ignore_timeout == CL_TRUE ) {
1898
CL_LOG(CL_LOG_ERROR,"got timeout while waiting for close response message");
1903
/* ok now we shutdown the handle */
1904
CL_LOG(CL_LOG_INFO,"shutdown of handle");
1906
/* remove handle from list */
1907
cl_raw_list_lock(handle->connection_list);
1908
elem = cl_connection_list_get_first_elem(handle->connection_list);
1910
CL_LOG(CL_LOG_ERROR,"########## connection list is not empty ##########");
1912
cl_raw_list_unlock(handle->connection_list);
1914
ret_val = cl_handle_list_remove_handle(cl_com_handle_list, handle, 0);
1915
cl_raw_list_unlock(cl_com_handle_list);
1918
/* only shutdown handle if handle was removed from list */
1919
if (ret_val == CL_RETVAL_OK) {
1920
/* delete handle thread */
1921
switch(cl_com_create_threads) {
1922
case CL_NO_THREAD: {
1923
CL_LOG(CL_LOG_INFO,"no threads enabled");
1926
case CL_RW_THREAD: {
1927
CL_LOG(CL_LOG_INFO,"shutdown handle write thread ...");
1928
thread_settings = handle->write_thread;
1929
handle->write_thread = NULL; /* set thread pointer to NULL because other threads use this pointer */
1930
ret_val = cl_thread_list_delete_thread(cl_com_thread_list, thread_settings);
1931
if (ret_val != CL_RETVAL_OK) {
1932
CL_LOG_STR(CL_LOG_ERROR,"error shutting down handle write thread", cl_get_error_text(ret_val));
1934
CL_LOG(CL_LOG_INFO,"shutdown handle write thread OK");
1937
CL_LOG(CL_LOG_INFO,"shutdown handle read thread ...");
1938
thread_settings = handle->read_thread;
1939
handle->read_thread = NULL; /* set thread pointer to NULL because other threads use this pointer */
1940
ret_val = cl_thread_list_delete_thread(cl_com_thread_list, thread_settings);
1941
if (ret_val != CL_RETVAL_OK) {
1942
CL_LOG_STR(CL_LOG_ERROR,"error shutting down handle read thread", cl_get_error_text(ret_val));
1944
CL_LOG(CL_LOG_INFO,"shutdown handle read thread OK");
1947
CL_LOG(CL_LOG_INFO,"shutdown handle service thread ...");
1948
thread_settings = handle->service_thread;
1949
handle->service_thread = NULL; /* set thread pointer to NULL because other threads use this pointer */
1950
ret_val = cl_thread_list_delete_thread(cl_com_thread_list,thread_settings );
1951
if (ret_val != CL_RETVAL_OK) {
1952
CL_LOG_STR(CL_LOG_ERROR,"error shutting down handle service thread", cl_get_error_text(ret_val));
1954
CL_LOG(CL_LOG_INFO,"shutdown handle service thread OK");
1957
/* all threads are down, deleteing condition variables */
1958
CL_LOG(CL_LOG_INFO,"shutdown handle write condition ...");
1959
ret_val = cl_thread_delete_thread_condition(&(handle->write_condition));
1960
if (ret_val != CL_RETVAL_OK) {
1961
CL_LOG_STR(CL_LOG_ERROR,"error shutting down handle write condition", cl_get_error_text(ret_val));
1963
CL_LOG(CL_LOG_INFO,"shutdown handle write condition OK");
1966
CL_LOG(CL_LOG_INFO,"shutdown handle application read condition ...");
1967
ret_val = cl_thread_delete_thread_condition(&(handle->app_condition));
1968
if (ret_val != CL_RETVAL_OK) {
1969
CL_LOG_STR(CL_LOG_ERROR,"error shutting down handle application read condition", cl_get_error_text(ret_val));
1971
CL_LOG(CL_LOG_INFO,"shutdown handle application read condition OK");
1974
CL_LOG(CL_LOG_INFO,"shutdown handle read condition ...");
1975
ret_val = cl_thread_delete_thread_condition(&(handle->read_condition));
1976
if (ret_val != CL_RETVAL_OK) {
1977
CL_LOG_STR(CL_LOG_ERROR,"error shutting down handle read condition", cl_get_error_text(ret_val));
1979
CL_LOG(CL_LOG_INFO,"shutdown handle read condition OK");
1985
/* cleanup all connection list entries */
1986
cl_raw_list_lock(handle->connection_list);
1988
/* mark each connection to close */
1989
elem = cl_connection_list_get_first_elem(handle->connection_list);
1991
elem->connection->connection_state = CL_CLOSING;
1992
elem->connection->connection_sub_state = CL_COM_DO_SHUTDOWN;
1993
elem = cl_connection_list_get_next_elem(elem);
1995
cl_raw_list_unlock(handle->connection_list);
1997
cl_connection_list_destroy_connections_to_close(handle);
1999
/* shutdown of service */
2000
if (handle->service_provider == CL_TRUE) {
2001
cl_com_connection_request_handler_cleanup(handle->service_handler);
2002
cl_com_close_connection(&(handle->service_handler));
2005
cl_app_message_queue_cleanup(&(handle->send_message_queue));
2006
cl_app_message_queue_cleanup(&(handle->received_message_queue));
2007
cl_connection_list_cleanup(&(handle->connection_list));
2008
cl_com_free_endpoint(&(handle->local));
2010
cl_string_list_cleanup(&(handle->allowed_host_list));
2012
if (handle->messages_ready_mutex != NULL) {
2013
if (pthread_mutex_destroy(handle->messages_ready_mutex) != EBUSY) {
2014
free(handle->messages_ready_mutex);
2015
handle->messages_ready_mutex = NULL;
2018
if (handle->connection_list_mutex != NULL) {
2019
if (pthread_mutex_destroy(handle->connection_list_mutex) != EBUSY) {
2020
free(handle->connection_list_mutex);
2021
handle->connection_list_mutex = NULL;
2024
cl_com_free_handle_statistic(&(handle->statistic));
2026
cl_com_free_debug_client_setup(&(handle->debug_client_setup));
2028
cl_com_free_ssl_setup(&(handle->ssl_setup));
2031
return CL_RETVAL_OK;
2036
#ifdef __CL_FUNCTION__
2037
#undef __CL_FUNCTION__
2039
#define __CL_FUNCTION__ "cl_com_setup_connection()"
2040
int cl_com_setup_connection(cl_com_handle_t* handle, cl_com_connection_t** connection) {
2041
int ret_val = CL_RETVAL_HANDLE_NOT_FOUND;
2042
if (handle != NULL) {
2043
switch(handle->framework) {
2045
ret_val = cl_com_tcp_setup_connection(connection,
2046
handle->service_port,
2047
handle->connect_port,
2048
handle->data_flow_type,
2049
handle->auto_close_mode,
2052
handle->tcp_connect_mode);
2056
ret_val = cl_com_ssl_setup_connection(connection,
2057
handle->service_port,
2058
handle->connect_port,
2059
handle->data_flow_type,
2060
handle->auto_close_mode,
2063
handle->tcp_connect_mode,
2067
case CL_CT_UNDEFINED: {
2068
ret_val = CL_RETVAL_UNDEFINED_FRAMEWORK;
2077
#ifdef __CL_FUNCTION__
2078
#undef __CL_FUNCTION__
2080
#define __CL_FUNCTION__ "cl_com_set_auto_close_mode()"
2081
int cl_com_set_auto_close_mode(cl_com_handle_t* handle, cl_xml_connection_autoclose_t mode ) {
2082
if (handle == NULL) {
2083
return CL_RETVAL_PARAMS;
2085
handle->auto_close_mode = mode;
2087
case CL_CM_AC_ENABLED:
2088
CL_LOG(CL_LOG_INFO,"auto close mode is enabled");
2090
case CL_CM_AC_DISABLED:
2091
CL_LOG(CL_LOG_INFO,"auto close mode is disabled");
2094
CL_LOG(CL_LOG_INFO,"unexpeced auto close mode");
2096
return CL_RETVAL_OK;
2099
#ifdef __CL_FUNCTION__
2100
#undef __CL_FUNCTION__
2102
#define __CL_FUNCTION__ "cl_com_get_auto_close_mode()"
2103
int cl_com_get_auto_close_mode(cl_com_handle_t* handle, cl_xml_connection_autoclose_t* mode ) {
2104
if (handle == NULL || mode == NULL ) {
2105
return CL_RETVAL_PARAMS;
2107
*mode = handle->auto_close_mode;
2108
return CL_RETVAL_OK;
2112
#ifdef __CL_FUNCTION__
2113
#undef __CL_FUNCTION__
2115
#define __CL_FUNCTION__ "cl_com_set_max_connection_close_mode()"
2116
int cl_com_set_max_connection_close_mode(cl_com_handle_t* handle, cl_max_count_t mode) {
2118
if (handle == NULL) {
2119
return CL_RETVAL_PARAMS;
2121
handle->max_connection_count_reached = CL_FALSE;
2122
handle->max_con_close_mode = mode;
2123
return CL_RETVAL_OK;
2126
#ifdef __CL_FUNCTION__
2127
#undef __CL_FUNCTION__
2129
#define __CL_FUNCTION__ "cl_com_get_max_connection_close_mode()"
2130
int cl_com_get_max_connection_close_mode(cl_com_handle_t* handle, cl_max_count_t* mode) {
2131
if (handle == NULL|| mode == NULL ) {
2132
return CL_RETVAL_PARAMS;
2134
*mode = handle->max_con_close_mode;
2135
return CL_RETVAL_OK;
2138
#ifdef __CL_FUNCTION__
2139
#undef __CL_FUNCTION__
2141
#define __CL_FUNCTION__ "cl_com_set_max_connections()"
2142
int cl_com_set_max_connections(cl_com_handle_t* handle, unsigned long value) {
2143
if (handle == NULL || value < 1) {
2144
return CL_RETVAL_PARAMS;
2147
handle->max_open_connections = value;
2148
return CL_RETVAL_OK;
2151
#ifdef __CL_FUNCTION__
2152
#undef __CL_FUNCTION__
2154
#define __CL_FUNCTION__ "cl_com_get_max_connections()"
2155
int cl_com_get_max_connections(cl_com_handle_t* handle, unsigned long* value) {
2156
if (handle == NULL || value == NULL) {
2157
return CL_RETVAL_PARAMS;
2160
*value = handle->max_open_connections;
2161
return CL_RETVAL_OK;
2165
#ifdef __CL_FUNCTION__
2166
#undef __CL_FUNCTION__
2168
#define __CL_FUNCTION__ "cl_com_get_handle()"
2169
cl_com_handle_t* cl_com_get_handle(const char* component_name, unsigned long component_id) {
2170
cl_handle_list_elem_t* elem = NULL;
2171
cl_com_handle_t* ret_handle = NULL;
2173
if (cl_com_handle_list == NULL) {
2177
if ( component_name == NULL) {
2178
CL_LOG(CL_LOG_WARNING,"cl_com_get_handle() - parameter error");
2183
if ( cl_raw_list_lock(cl_com_handle_list) != CL_RETVAL_OK) {
2184
CL_LOG(CL_LOG_WARNING,"cl_com_get_handle() - lock error");
2188
CL_LOG_STR(CL_LOG_INFO,"try to find handle for", component_name);
2189
if ( component_id != 0) {
2190
CL_LOG_INT(CL_LOG_INFO,"handle must have id", (int)component_id);
2192
CL_LOG(CL_LOG_INFO,"ignoring component_id");
2194
elem = cl_handle_list_get_first_elem(cl_com_handle_list);
2195
while ( elem != NULL) {
2196
cl_com_handle_t* handle = elem->handle;
2198
/* if component id is zero, we just search for the name */
2199
if (handle->local->comp_id == component_id || component_id == 0) {
2200
if (strcmp(handle->local->comp_name, component_name) == 0) {
2201
if (ret_handle != NULL) {
2202
CL_LOG(CL_LOG_ERROR,"cl_com_get_handle() - found more than one handle");
2204
ret_handle = handle;
2208
elem = cl_handle_list_get_next_elem(elem);
2212
if ( cl_raw_list_unlock(cl_com_handle_list ) != CL_RETVAL_OK) {
2213
CL_LOG(CL_LOG_WARNING,"cl_com_get_handle() - unlock error");
2217
if (ret_handle == NULL) {
2218
CL_LOG(CL_LOG_INFO,"cl_com_get_handle() - handle not found");
2224
cl_thread_mode_t cl_commlib_get_thread_state(void) {
2225
return cl_com_create_threads;
2228
#ifdef __CL_FUNCTION__
2229
#undef __CL_FUNCTION__
2231
#define __CL_FUNCTION__ "cl_com_set_alias_file()"
2232
int cl_com_set_alias_file(const char* alias_file) {
2233
int ret_val = CL_RETVAL_NO_FRAMEWORK_INIT;
2235
if (alias_file == NULL) {
2236
return CL_RETVAL_PARAMS;
2239
if (cl_com_host_list != NULL) {
2240
ret_val = cl_host_list_set_alias_file(cl_com_get_host_list(), alias_file );
2246
#ifdef __CL_FUNCTION__
2247
#undef __CL_FUNCTION__
2249
#define __CL_FUNCTION__ "cl_com_set_alias_file_dirty()"
2250
int cl_com_set_alias_file_dirty(void) {
2251
int ret_val = CL_RETVAL_NO_FRAMEWORK_INIT;
2253
if (cl_com_host_list != NULL) {
2254
ret_val = cl_host_list_set_alias_file_dirty(cl_com_get_host_list());
2262
#ifdef __CL_FUNCTION__
2263
#undef __CL_FUNCTION__
2265
#define __CL_FUNCTION__ "cl_com_append_host_alias()"
2266
int cl_com_append_host_alias(char* local_resolved_name, char* alias_name) {
2267
int ret_val = CL_RETVAL_OK;
2268
cl_host_list_data_t* ldata = NULL;
2270
if (local_resolved_name == NULL || alias_name == NULL) {
2271
return CL_RETVAL_PARAMS;
2274
ldata = cl_host_list_get_data(cl_com_get_host_list());
2275
if (ldata == NULL) {
2276
return CL_RETVAL_NO_FRAMEWORK_INIT;
2278
ret_val = cl_host_alias_list_append_host(ldata->host_alias_list, local_resolved_name, alias_name, 1 );
2279
if (ret_val == CL_RETVAL_OK) {
2280
CL_LOG(CL_LOG_INFO,"added host alias:");
2281
CL_LOG_STR(CL_LOG_INFO,"local resolved name:",local_resolved_name );
2282
CL_LOG_STR(CL_LOG_INFO,"aliased name :",alias_name);
2287
#ifdef __CL_FUNCTION__
2288
#undef __CL_FUNCTION__
2290
#define __CL_FUNCTION__ "cl_com_remove_host_alias()"
2291
int cl_com_remove_host_alias(char* alias_name) {
2292
int ret_val = CL_RETVAL_OK;
2293
cl_host_list_data_t* ldata = NULL;
2294
cl_host_alias_list_elem_t* elem = NULL;
2295
if ( alias_name == NULL) {
2296
return CL_RETVAL_PARAMS;
2299
ldata = cl_host_list_get_data(cl_com_get_host_list());
2300
if (ldata == NULL) {
2301
return CL_RETVAL_NO_FRAMEWORK_INIT;
2304
cl_raw_list_lock(ldata->host_alias_list);
2305
elem = cl_host_alias_list_get_first_elem(ldata->host_alias_list);
2307
if (strcmp(elem->alias_name,alias_name) == 0) {
2308
CL_LOG(CL_LOG_INFO,"removing host alias:");
2309
CL_LOG_STR(CL_LOG_INFO,"local resolved name:",elem->local_resolved_hostname );
2310
CL_LOG_STR(CL_LOG_INFO,"aliased name :",elem->alias_name);
2312
ret_val = cl_host_alias_list_remove_host(ldata->host_alias_list,elem,0);
2313
cl_raw_list_unlock(ldata->host_alias_list);
2314
if (ret_val != CL_RETVAL_OK) {
2315
CL_LOG(CL_LOG_ERROR,"error removing host alias");
2319
elem = cl_host_alias_list_get_next_elem(elem);
2321
cl_raw_list_unlock(ldata->host_alias_list);
2322
return CL_RETVAL_UNKNOWN;
2325
#ifdef __CL_FUNCTION__
2326
#undef __CL_FUNCTION__
2328
#define __CL_FUNCTION__ "cl_com_append_known_endpoint_from_name()"
2330
cl_com_append_known_endpoint_from_name(char* unresolved_comp_host,
2332
unsigned long comp_id,
2334
cl_xml_connection_autoclose_t autoclose,
2335
cl_bool_t is_static)
2338
char* resolved_hostname = NULL;
2339
struct in_addr in_addr;
2340
cl_com_endpoint_t* endpoint = NULL;
2342
if (unresolved_comp_host == NULL || comp_name == NULL) {
2343
return CL_RETVAL_PARAMS;
2346
retval = cl_com_cached_gethostbyname(unresolved_comp_host, &resolved_hostname, &in_addr, NULL, NULL );
2347
if (retval != CL_RETVAL_OK) {
2348
CL_LOG_STR(CL_LOG_ERROR,"could not resolve host", unresolved_comp_host);
2352
endpoint = cl_com_create_endpoint(resolved_hostname, comp_name, comp_id, &in_addr);
2353
if (endpoint == NULL) {
2354
free(resolved_hostname);
2355
return CL_RETVAL_MALLOC;
2358
retval = cl_com_append_known_endpoint(endpoint, comp_port, autoclose, is_static);
2360
free(resolved_hostname);
2361
cl_com_free_endpoint(&endpoint);
2366
#ifdef __CL_FUNCTION__
2367
#undef __CL_FUNCTION__
2369
#define __CL_FUNCTION__ "cl_com_append_known_endpoint()"
2370
int cl_com_append_known_endpoint(cl_com_endpoint_t* endpoint, int service_port, cl_xml_connection_autoclose_t autoclose, cl_bool_t is_static) {
2371
return cl_endpoint_list_define_endpoint(cl_com_get_endpoint_list(), endpoint, service_port, autoclose, is_static);
2374
#ifdef __CL_FUNCTION__
2375
#undef __CL_FUNCTION__
2377
#define __CL_FUNCTION__ "cl_com_remove_known_endpoint()"
2378
int cl_com_remove_known_endpoint(cl_com_endpoint_t* endpoint) {
2379
return cl_endpoint_list_undefine_endpoint(cl_com_get_endpoint_list(), endpoint);
2383
#ifdef __CL_FUNCTION__
2384
#undef __CL_FUNCTION__
2386
#define __CL_FUNCTION__ "cl_com_get_known_endpoint_autoclose_mode()"
2387
int cl_com_get_known_endpoint_autoclose_mode(cl_com_endpoint_t* endpoint, cl_xml_connection_autoclose_t* auto_close_mode ) {
2388
return cl_endpoint_list_get_autoclose_mode(cl_com_get_endpoint_list(), endpoint, auto_close_mode);
2392
#ifdef __CL_FUNCTION__
2393
#undef __CL_FUNCTION__
2395
#define __CL_FUNCTION__ "cl_com_get_known_endpoint_port()"
2396
int cl_com_get_known_endpoint_port(cl_com_endpoint_t* endpoint, int* service_port) {
2397
return cl_endpoint_list_get_service_port(cl_com_get_endpoint_list(), endpoint, service_port);
2402
#ifdef __CL_FUNCTION__
2403
#undef __CL_FUNCTION__
2405
#define __CL_FUNCTION__ "cl_com_remove_known_endpoint_from_name()"
2406
int cl_com_remove_known_endpoint_from_name(const char* unresolved_comp_host, const char* comp_name, unsigned long comp_id) {
2407
int ret_val = CL_RETVAL_PARAMS;
2409
char* resolved_hostname = NULL;
2410
struct in_addr in_addr;
2411
cl_com_endpoint_t* endpoint = NULL;
2413
if (unresolved_comp_host == NULL || comp_name == NULL) {
2417
ret_val = cl_com_cached_gethostbyname(unresolved_comp_host, &resolved_hostname, &in_addr, NULL, NULL );
2418
if (ret_val != CL_RETVAL_OK) {
2419
CL_LOG_STR(CL_LOG_ERROR,"could not resolve host", unresolved_comp_host);
2423
endpoint = cl_com_create_endpoint(resolved_hostname, comp_name, comp_id, &in_addr);
2424
if (endpoint == NULL) {
2425
free(resolved_hostname);
2426
return CL_RETVAL_MALLOC;
2429
ret_val = cl_com_remove_known_endpoint(endpoint);
2431
free(resolved_hostname);
2432
cl_com_free_endpoint(&endpoint);
2438
#ifdef __CL_FUNCTION__
2439
#undef __CL_FUNCTION__
2441
#define __CL_FUNCTION__ "cl_com_get_known_endpoint_autoclose_mode_from_name()"
2442
int cl_com_get_known_endpoint_autoclose_mode_from_name(char* unresolved_comp_host, char* comp_name, unsigned long comp_id, cl_xml_connection_autoclose_t* auto_close_mode ) {
2443
int retval = CL_RETVAL_PARAMS;
2445
char* resolved_hostname = NULL;
2446
struct in_addr in_addr;
2447
cl_com_endpoint_t* endpoint = NULL;
2449
if (unresolved_comp_host == NULL || comp_name == NULL) {
2453
retval = cl_com_cached_gethostbyname(unresolved_comp_host, &resolved_hostname, &in_addr, NULL, NULL );
2454
if (retval != CL_RETVAL_OK) {
2455
CL_LOG_STR(CL_LOG_ERROR,"could not resolve host",unresolved_comp_host);
2459
endpoint = cl_com_create_endpoint(resolved_hostname,comp_name , comp_id, &in_addr);
2460
if (endpoint == NULL) {
2461
free(resolved_hostname);
2462
return CL_RETVAL_MALLOC;
2465
retval = cl_com_get_known_endpoint_autoclose_mode(endpoint, auto_close_mode);
2467
free(resolved_hostname);
2468
cl_com_free_endpoint(&endpoint);
2474
#ifdef __CL_FUNCTION__
2475
#undef __CL_FUNCTION__
2477
#define __CL_FUNCTION__ "cl_com_get_known_service_port_from_name()"
2478
int cl_com_get_known_endpoint_port_from_name(char* unresolved_comp_host, char* comp_name, unsigned long comp_id, int* service_port ) {
2479
int retval = CL_RETVAL_OK;
2481
char* resolved_hostname = NULL;
2482
struct in_addr in_addr;
2483
cl_com_endpoint_t* endpoint = NULL;
2485
if (unresolved_comp_host == NULL || comp_name == NULL) {
2486
return CL_RETVAL_PARAMS;
2489
retval = cl_com_cached_gethostbyname(unresolved_comp_host, &resolved_hostname, &in_addr, NULL, NULL );
2490
if (retval != CL_RETVAL_OK) {
2491
CL_LOG_STR(CL_LOG_ERROR,"could not resolve host",unresolved_comp_host);
2495
endpoint = cl_com_create_endpoint(resolved_hostname,comp_name , comp_id, &in_addr);
2496
if (endpoint == NULL) {
2497
free(resolved_hostname);
2498
return CL_RETVAL_MALLOC;
2501
retval = cl_com_get_known_endpoint_port(endpoint,service_port);
2503
free(resolved_hostname);
2504
cl_com_free_endpoint(&endpoint);
2511
#ifdef __CL_FUNCTION__
2512
#undef __CL_FUNCTION__
2514
#define __CL_FUNCTION__ "cl_com_get_service_fd()"
2515
int cl_com_set_handle_fds(cl_com_handle_t* handle, fd_set* file_descriptor_set) {
2517
int ret_val = CL_RETVAL_UNKNOWN;
2518
cl_connection_list_elem_t* elem = NULL;
2519
cl_com_connection_t* connection = NULL;
2521
if (handle == NULL || file_descriptor_set == NULL) {
2522
return CL_RETVAL_PARAMS;
2525
if (handle->service_handler != NULL) {
2526
if (cl_com_connection_get_fd(handle->service_handler, &fd) == CL_RETVAL_OK) {
2527
FD_SET(fd, file_descriptor_set);
2528
ret_val = CL_RETVAL_OK;
2532
cl_raw_list_lock(handle->connection_list);
2533
elem = cl_connection_list_get_first_elem(handle->connection_list);
2535
connection = elem->connection;
2536
if (cl_com_connection_get_fd(connection, &fd) == CL_RETVAL_OK) {
2537
FD_SET(fd, file_descriptor_set);
2538
ret_val = CL_RETVAL_OK;
2540
elem = cl_connection_list_get_next_elem(elem);
2542
cl_raw_list_unlock(handle->connection_list);
2547
#ifdef __CL_FUNCTION__
2548
#undef __CL_FUNCTION__
2550
#define __CL_FUNCTION__ "cl_com_get_service_port()"
2551
int cl_com_get_service_port(cl_com_handle_t* handle, int* port) {
2552
if (handle == NULL || port == NULL) {
2553
return CL_RETVAL_PARAMS;
2555
if (handle->service_provider == CL_FALSE) {
2556
CL_LOG(CL_LOG_WARNING,"no service running");
2558
return CL_RETVAL_UNKNOWN;
2561
if (handle->service_handler == NULL) {
2562
CL_LOG(CL_LOG_ERROR,"no service handler found");
2564
return CL_RETVAL_UNKNOWN;
2567
return cl_com_connection_get_service_port(handle->service_handler, port);
2570
#ifdef __CL_FUNCTION__
2571
#undef __CL_FUNCTION__
2573
#define __CL_FUNCTION__ "cl_com_set_synchron_receive_timeout()"
2574
int cl_com_set_synchron_receive_timeout(cl_com_handle_t* handle, int timeout) {
2575
if (handle == NULL || timeout <= 0 ) {
2576
CL_LOG(CL_LOG_ERROR,"error setting synchron receive timeout");
2577
return CL_RETVAL_PARAMS;
2579
CL_LOG_INT(CL_LOG_INFO,"setting synchron receive timeout to", (int)timeout);
2580
handle->synchron_receive_timeout = timeout;
2581
return CL_RETVAL_OK;
2585
#ifdef __CL_FUNCTION__
2586
#undef __CL_FUNCTION__
2588
#define __CL_FUNCTION__ "cl_com_get_connect_port()"
2589
int cl_com_get_connect_port(cl_com_handle_t* handle, int* port) {
2590
if (handle == NULL || port == NULL) {
2591
return CL_RETVAL_PARAMS;
2594
if (handle->connect_port > 0) {
2595
*port = handle->connect_port;
2596
return CL_RETVAL_OK;
2599
return CL_RETVAL_UNKNOWN;
2603
#ifdef __CL_FUNCTION__
2604
#undef __CL_FUNCTION__
2606
#define __CL_FUNCTION__ "cl_com_add_allowed_host()"
2607
int cl_com_add_allowed_host(cl_com_handle_t* handle, char* hostname) {
2608
int retval = CL_RETVAL_OK;
2609
char* resolved_name = NULL;
2611
if (handle == NULL) {
2612
CL_LOG(CL_LOG_ERROR,"no handle specified");
2613
return CL_RETVAL_PARAMS;
2615
if (hostname == NULL) {
2616
CL_LOG(CL_LOG_ERROR,"no host specified");
2617
return CL_RETVAL_PARAMS;
2619
retval = cl_com_cached_gethostbyname(hostname, &resolved_name, NULL, NULL, NULL );
2620
if (retval != CL_RETVAL_OK) {
2621
CL_LOG_STR(CL_LOG_ERROR,"could not resolve host",hostname);
2624
free(resolved_name);
2625
resolved_name = NULL;
2626
retval = cl_string_list_append_string(handle->allowed_host_list, hostname, 1);
2627
if (retval != CL_RETVAL_OK) {
2628
CL_LOG_STR(CL_LOG_WARNING,"could not add host to allowed host list:", hostname);
2630
CL_LOG_STR(CL_LOG_INFO,"added host to allowed host list:", hostname);
2636
#ifdef __CL_FUNCTION__
2637
#undef __CL_FUNCTION__
2639
#define __CL_FUNCTION__ "cl_com_remove_allowed_host()"
2640
int cl_com_remove_allowed_host(cl_com_handle_t* handle, char* hostname) {
2641
if (handle == NULL) {
2642
CL_LOG(CL_LOG_ERROR,"no handle specified");
2643
return CL_RETVAL_PARAMS;
2645
if (hostname == NULL) {
2646
CL_LOG(CL_LOG_ERROR,"no host specified");
2647
return CL_RETVAL_PARAMS;
2649
return cl_string_list_remove_string(handle->allowed_host_list, hostname ,1);
2653
#ifdef __CL_FUNCTION__
2654
#undef __CL_FUNCTION__
2656
#define __CL_FUNCTION__ "cl_commlib_trigger()"
2657
int cl_commlib_trigger(cl_com_handle_t* handle, int synchron) {
2659
cl_commlib_check_callback_functions();
2661
if (handle != NULL) {
2662
switch(cl_com_create_threads) {
2664
return cl_com_trigger(handle, synchron);
2665
case CL_RW_THREAD: {
2666
int ret_val = CL_RETVAL_OK;
2667
/* application has nothing to do, wait for next message */
2668
pthread_mutex_lock(handle->messages_ready_mutex);
2669
if ((handle->messages_ready_for_read == 0) && (synchron == 1)) {
2670
CL_LOG(CL_LOG_INFO,"NO MESSAGES to READ, WAITING ...");
2671
pthread_mutex_unlock(handle->messages_ready_mutex);
2672
ret_val = cl_thread_wait_for_thread_condition(handle->app_condition ,
2673
handle->select_sec_timeout,
2674
handle->select_usec_timeout);
2676
pthread_mutex_unlock(handle->messages_ready_mutex);
2678
if (ret_val != CL_RETVAL_OK) {
2681
return CL_RETVAL_THREADS_ENABLED;
2685
return CL_RETVAL_PARAMS;
2688
/* this function is used for no thread implementation and must be called
2689
permanent to get data into the data structures and lists */
2690
#ifdef __CL_FUNCTION__
2691
#undef __CL_FUNCTION__
2693
#define __CL_FUNCTION__ "cl_com_trigger()"
2694
static int cl_com_trigger(cl_com_handle_t* handle, int synchron) {
2695
cl_connection_list_elem_t* elem = NULL;
2698
int retval = CL_RETVAL_OK;
2699
char tmp_string[1024];
2700
cl_bool_t ignore_timeouts = CL_FALSE;
2702
if (handle == NULL) {
2703
CL_LOG(CL_LOG_ERROR,"no handle specified");
2704
return CL_RETVAL_PARAMS;
2707
/* when threads enabled: this is done by cl_com_trigger_thread() - OK */
2708
cl_com_host_list_refresh(cl_com_get_host_list());
2709
cl_com_endpoint_list_refresh(cl_com_get_endpoint_list());
2711
/* remove broken connections */
2712
/* when threads enabled: this is done by cl_com_handle_service_thread() - OK */
2713
cl_connection_list_destroy_connections_to_close(handle);
2715
/* calculate statistics each second */
2716
gettimeofday(&now,NULL);
2717
if (handle->statistic->last_update.tv_sec != now.tv_sec) {
2718
cl_commlib_calculate_statistic(handle, CL_FALSE, 1); /* do not force update */
2719
cl_commlib_handle_debug_clients(handle, CL_TRUE);
2720
cl_commlib_app_message_queue_cleanup(handle); /* do message queue cleanup only every second */
2723
/* check number of connections */
2724
cl_commlib_check_connection_count(handle);
2726
/* do virtual select */
2727
if (synchron == 1) {
2728
retval = cl_com_open_connection_request_handler(handle, handle->select_sec_timeout, handle->select_usec_timeout, CL_RW_SELECT);
2730
retval = cl_com_open_connection_request_handler(handle, 0, 0, CL_RW_SELECT);
2733
ignore_timeouts = cl_com_get_ignore_timeouts_flag();
2735
/* read / write messages */
2736
cl_raw_list_lock(handle->connection_list);
2738
elem = cl_connection_list_get_first_elem(handle->connection_list);
2741
if (elem->connection->connection_state == CL_DISCONNECTED) {
2742
/* open connection if there are messages to send */
2743
if ( cl_raw_list_get_elem_count(elem->connection->send_message_list) > 0) {
2744
CL_LOG(CL_LOG_INFO,"setting connection state to CL_OPENING");
2745
elem->connection->connection_state = CL_OPENING;
2749
if (elem->connection->connection_state == CL_OPENING) {
2751
/* trigger connect */
2753
if (elem->connection->data_read_flag == CL_COM_DATA_READY ||
2754
(elem->connection->fd_ready_for_write == CL_COM_DATA_READY && elem->connection->data_write_flag == CL_COM_DATA_READY) ) {
2756
return_value = cl_com_open_connection(elem->connection, handle->open_connection_timeout, NULL, NULL);
2758
if (return_value != CL_RETVAL_OK) {
2759
if (ignore_timeouts == CL_TRUE || return_value != CL_RETVAL_UNCOMPLETE_WRITE) {
2760
CL_LOG_STR(CL_LOG_ERROR,"could not open connection:",cl_get_error_text(return_value));
2761
elem->connection->connection_state = CL_CLOSING;
2762
elem->connection->connection_sub_state = CL_COM_DO_SHUTDOWN;
2766
/* check timeouts */
2767
if ( elem->connection->read_buffer_timeout_time != 0) {
2768
if ( now.tv_sec >= elem->connection->read_buffer_timeout_time || ignore_timeouts == CL_TRUE) {
2769
CL_LOG(CL_LOG_ERROR,"read timeout for connection opening");
2770
elem->connection->connection_state = CL_CLOSING;
2771
elem->connection->connection_sub_state = CL_COM_DO_SHUTDOWN;
2774
if ( elem->connection->write_buffer_timeout_time != 0) {
2775
if ( now.tv_sec >= elem->connection->write_buffer_timeout_time || ignore_timeouts == CL_TRUE) {
2776
CL_LOG(CL_LOG_ERROR,"write timeout for connection opening");
2777
elem->connection->connection_state = CL_CLOSING;
2778
elem->connection->connection_sub_state = CL_COM_DO_SHUTDOWN;
2784
if (elem->connection->connection_state == CL_ACCEPTING) {
2786
if (elem->connection->data_read_flag == CL_COM_DATA_READY ||
2787
(elem->connection->fd_ready_for_write == CL_COM_DATA_READY && elem->connection->data_write_flag == CL_COM_DATA_READY) ) {
2789
return_value = cl_com_connection_complete_accept(elem->connection,handle->open_connection_timeout);
2790
if (return_value != CL_RETVAL_OK) {
2791
if (ignore_timeouts == CL_TRUE ||
2792
(return_value != CL_RETVAL_UNCOMPLETE_READ &&
2793
return_value != CL_RETVAL_UNCOMPLETE_WRITE &&
2794
return_value != CL_RETVAL_SELECT_ERROR)) {
2795
CL_LOG_STR(CL_LOG_ERROR,"connection accept error:",cl_get_error_text(return_value));
2796
elem->connection->connection_state = CL_CLOSING;
2797
elem->connection->connection_sub_state = CL_COM_DO_SHUTDOWN;
2800
elem->connection->connection_state = CL_CONNECTING;
2801
elem->connection->connection_sub_state = CL_COM_READ_INIT;
2802
elem->connection->data_read_flag = CL_COM_DATA_NOT_READY;
2805
/* check timeouts */
2806
if ( elem->connection->read_buffer_timeout_time != 0) {
2807
if ( now.tv_sec >= elem->connection->read_buffer_timeout_time ) {
2808
CL_LOG(CL_LOG_ERROR,"accept timeout for connection");
2809
elem->connection->connection_state = CL_CLOSING;
2810
elem->connection->connection_sub_state = CL_COM_DO_SHUTDOWN;
2813
if ( elem->connection->write_buffer_timeout_time != 0) {
2814
if ( now.tv_sec >= elem->connection->write_buffer_timeout_time ) {
2815
CL_LOG(CL_LOG_ERROR,"accept timeout for connection");
2816
elem->connection->connection_state = CL_CLOSING;
2817
elem->connection->connection_sub_state = CL_COM_DO_SHUTDOWN;
2823
if (elem->connection->connection_state == CL_CONNECTING) {
2825
if (elem->connection->data_read_flag == CL_COM_DATA_READY ||
2826
(elem->connection->fd_ready_for_write == CL_COM_DATA_READY && elem->connection->data_write_flag == CL_COM_DATA_READY) ) {
2827
return_value = cl_com_connection_complete_request(handle->connection_list, elem, handle->open_connection_timeout, CL_RW_SELECT);
2828
if (return_value != CL_RETVAL_OK) {
2829
if (ignore_timeouts == CL_TRUE ||
2830
(return_value != CL_RETVAL_UNCOMPLETE_READ &&
2831
return_value != CL_RETVAL_UNCOMPLETE_WRITE &&
2832
return_value != CL_RETVAL_SELECT_ERROR)) {
2833
CL_LOG_STR(CL_LOG_ERROR,"connection establish error:",cl_get_error_text(return_value));
2834
elem->connection->connection_state = CL_CLOSING;
2835
elem->connection->connection_sub_state = CL_COM_DO_SHUTDOWN;
2838
if (elem->connection->connection_state == CL_CONNECTED) {
2839
cl_commlib_finish_request_completeness(elem->connection);
2840
/* connection is now in connect state, do select before next reading */
2841
elem->connection->data_read_flag = CL_COM_DATA_NOT_READY;
2844
/* check timeouts */
2845
if ( elem->connection->read_buffer_timeout_time != 0) {
2846
if (now.tv_sec >= elem->connection->read_buffer_timeout_time ||
2847
ignore_timeouts == CL_TRUE) {
2848
CL_LOG(CL_LOG_ERROR,"read timeout for connection completion");
2849
elem->connection->connection_state = CL_CLOSING;
2850
elem->connection->connection_sub_state = CL_COM_DO_SHUTDOWN;
2853
if ( elem->connection->write_buffer_timeout_time != 0) {
2854
if (now.tv_sec >= elem->connection->write_buffer_timeout_time ||
2855
ignore_timeouts == CL_TRUE) {
2856
CL_LOG(CL_LOG_ERROR,"write timeout for connection completion");
2857
elem->connection->connection_state = CL_CLOSING;
2858
elem->connection->connection_sub_state = CL_COM_DO_SHUTDOWN;
2864
if (elem->connection->connection_state == CL_CONNECTED) {
2865
/* check ack timeouts */
2866
int return_value = cl_commlib_handle_connection_ack_timeouts(elem->connection);
2868
if (elem->connection->data_read_flag == CL_COM_DATA_READY &&
2869
elem->connection->ccrm_sent == 0 &&
2870
elem->connection->ccrm_received == 0 ) {
2871
return_value = cl_commlib_handle_connection_read(elem->connection);
2872
if (return_value != CL_RETVAL_OK) {
2873
if (ignore_timeouts == CL_TRUE ||
2874
(return_value != CL_RETVAL_UNCOMPLETE_READ &&
2875
return_value != CL_RETVAL_SELECT_ERROR) ) {
2876
elem->connection->connection_state = CL_CLOSING;
2877
elem->connection->connection_sub_state = CL_COM_DO_SHUTDOWN;
2878
CL_LOG_STR(CL_LOG_ERROR,"read from connection: setting close flag! Reason:", cl_get_error_text(return_value));
2879
snprintf(tmp_string, 1024, MSG_CL_COMMLIB_CLOSING_SSU,
2880
elem->connection->remote->comp_host,
2881
elem->connection->remote->comp_name,
2882
sge_u32c(elem->connection->remote->comp_id));
2883
cl_commlib_push_application_error(CL_LOG_ERROR, return_value,tmp_string);
2887
/* check timeouts */
2888
if ( elem->connection->read_buffer_timeout_time != 0) {
2889
if ( now.tv_sec >= elem->connection->read_buffer_timeout_time ) {
2890
CL_LOG(CL_LOG_ERROR,"connection read timeout");
2891
elem->connection->connection_state = CL_CLOSING;
2892
elem->connection->connection_sub_state = CL_COM_DO_SHUTDOWN;
2895
if ( elem->connection->ccrm_received != 0 ) {
2896
CL_LOG(CL_LOG_WARNING, "will not read from this connection, because ccrm was received!");
2901
if (elem->connection->connection_state == CL_CONNECTED) {
2902
int return_value = CL_RETVAL_OK;
2903
if (elem->connection->fd_ready_for_write == CL_COM_DATA_READY &&
2904
elem->connection->data_write_flag == CL_COM_DATA_READY &&
2905
elem->connection->ccrm_sent == 0 ) {
2906
return_value = cl_commlib_handle_connection_write(elem->connection);
2907
if (return_value != CL_RETVAL_OK) {
2908
if (ignore_timeouts == CL_TRUE ||
2909
(return_value != CL_RETVAL_UNCOMPLETE_WRITE &&
2910
return_value != CL_RETVAL_SELECT_ERROR)) {
2911
elem->connection->connection_state = CL_CLOSING;
2912
elem->connection->connection_sub_state = CL_COM_DO_SHUTDOWN;
2913
CL_LOG_STR(CL_LOG_ERROR,"write to connection: setting close flag! Reason:", cl_get_error_text(return_value));
2914
snprintf(tmp_string, 1024, MSG_CL_COMMLIB_CLOSING_SSU,
2915
elem->connection->remote->comp_host,
2916
elem->connection->remote->comp_name,
2917
sge_u32c(elem->connection->remote->comp_id));
2918
cl_commlib_push_application_error(CL_LOG_ERROR, return_value, tmp_string);
2922
/* check timeouts */
2923
if ( elem->connection->write_buffer_timeout_time != 0) {
2924
if ( now.tv_sec >= elem->connection->write_buffer_timeout_time ) {
2925
CL_LOG(CL_LOG_ERROR,"write timeout for connected endpoint");
2926
snprintf(tmp_string, 1024, MSG_CL_COMMLIB_CLOSING_SSU,
2927
elem->connection->remote->comp_host,
2928
elem->connection->remote->comp_name,
2929
sge_u32c(elem->connection->remote->comp_id));
2930
cl_commlib_push_application_error(CL_LOG_ERROR, CL_RETVAL_SEND_TIMEOUT, tmp_string);
2931
elem->connection->connection_state = CL_CLOSING;
2932
elem->connection->connection_sub_state = CL_COM_DO_SHUTDOWN;
2937
/* send connection close response message (if no message are in buffers) */
2938
if (elem->connection->ccm_received == 1 ) {
2939
CL_LOG(CL_LOG_INFO,"received ccm");
2941
CL_LOG_INT(CL_LOG_INFO,"receive buffer:",(int)cl_raw_list_get_elem_count(elem->connection->received_message_list) );
2942
CL_LOG_INT(CL_LOG_INFO,"send buffer :",(int)cl_raw_list_get_elem_count(elem->connection->send_message_list) );
2944
if (cl_raw_list_get_elem_count(elem->connection->send_message_list) == 0 &&
2945
cl_raw_list_get_elem_count(elem->connection->received_message_list) == 0) {
2946
elem->connection->ccm_received = 2;
2947
elem->connection->connection_sub_state = CL_COM_SENDING_CCRM;
2948
/* disable #if 1 if you want to test a client, that is frozen! */
2949
cl_commlib_send_ccrm_message(elem->connection);
2950
CL_LOG(CL_LOG_INFO,"sending ccrm");
2952
CL_LOG(CL_LOG_WARNING,"can't send ccrm, still messages in buffer");
2956
elem = cl_connection_list_get_next_elem(elem);
2959
/* now take list entry and add it at the end if there are more connections */
2960
/* this must be done in order to NOT prefer the first connection in list */
2961
if (cl_raw_list_get_elem_count(handle->connection_list) > 1) {
2962
elem = cl_connection_list_get_first_elem(handle->connection_list);
2963
cl_raw_list_dechain_elem(handle->connection_list,elem->raw_elem);
2964
cl_raw_list_append_dechained_elem(handle->connection_list,elem->raw_elem);
2967
cl_raw_list_unlock(handle->connection_list);
2970
/* when threads enabled: this is done by cl_com_handle_service_thread() */
2971
if (handle->service_provider == CL_TRUE &&
2972
handle->service_handler->data_read_flag == CL_COM_DATA_READY) {
2974
/* new connection requests */
2975
cl_com_connection_t* new_con = NULL;
2976
cl_com_connection_request_handler(handle->service_handler, &new_con);
2978
if (new_con != NULL) {
2979
handle->statistic->new_connections = handle->statistic->new_connections + 1;
2980
new_con->handler = handle->service_handler->handler;
2981
CL_LOG(CL_LOG_INFO,"adding new client");
2982
new_con->read_buffer_timeout_time = now.tv_sec + handle->open_connection_timeout;
2983
cl_connection_list_append_connection(handle->connection_list, new_con, 1);
2993
#ifdef __CL_FUNCTION__
2994
#undef __CL_FUNCTION__
2996
#define __CL_FUNCTION__ "cl_commlib_handle_connection_read()"
2999
/* WARNING: connection_list must be locked by caller */
3000
static int cl_commlib_handle_connection_read(cl_com_connection_t* connection) {
3001
cl_com_message_t* message = NULL;
3002
cl_message_list_elem_t* message_list_element = NULL;
3003
unsigned long size = 0;
3004
int return_value = CL_RETVAL_OK;
3005
int connect_port = 0;
3007
cl_bool_t new_message_for_queue = CL_FALSE;
3009
if (connection == NULL) {
3010
return CL_RETVAL_PARAMS;
3013
if (connection->data_flow_type == CL_CM_CT_STREAM) {
3014
cl_bool_t is_debug_client = CL_FALSE;
3016
if (connection->remote != NULL) {
3017
if (connection->remote->comp_name != NULL) {
3018
if (strcmp(connection->remote->comp_name, CL_COM_DEBUG_CLIENT_NAME) == 0) {
3019
is_debug_client = CL_TRUE;
3024
if (is_debug_client == CL_TRUE) {
3026
gettimeofday(&now,NULL);
3027
connection->read_buffer_timeout_time = now.tv_sec + connection->handler->read_timeout;
3028
return_value = cl_com_read(connection, &(connection->data_read_buffer[connection->data_read_buffer_pos]), connection->data_buffer_size - connection->data_read_buffer_pos , &size);
3029
connection->read_buffer_timeout_time = 0;
3031
if (return_value != CL_RETVAL_OK && return_value != CL_RETVAL_UNCOMPLETE_READ) {
3032
return return_value;
3035
connection->data_read_buffer_pos += size;
3038
while( connection->data_read_buffer_pos > 0) {
3039
int unparsed_string_start = -1;
3040
for (pos = 0; pos < connection->data_read_buffer_pos; pos++) {
3041
if ( connection->data_read_buffer[pos] == 0 ) {
3042
cl_bool_t changed_mode = CL_FALSE;
3044
unparsed_string_start = pos + 1;
3046
/* TODO: implement clean commando syntax parser for debug_clients, etc. */
3047
if (strcmp("set tag ALL",(char*)connection->data_read_buffer) == 0) {
3048
connection->handler->debug_client_setup->dc_mode = CL_DEBUG_CLIENT_ALL;
3049
changed_mode = CL_TRUE;
3050
} else if (strcmp("set tag MSG",(char*)connection->data_read_buffer) == 0) {
3051
connection->handler->debug_client_setup->dc_mode = CL_DEBUG_CLIENT_MSG;
3052
changed_mode = CL_TRUE;
3053
} else if (strcmp("set tag APP",(char*)connection->data_read_buffer) == 0) {
3054
connection->handler->debug_client_setup->dc_mode = CL_DEBUG_CLIENT_APP;
3055
changed_mode = CL_TRUE;
3056
} else if (strcmp("set dump OFF",(char*)connection->data_read_buffer) == 0) {
3057
connection->handler->debug_client_setup->dc_dump_flag = CL_FALSE;
3058
} else if (strcmp("set dump ON",(char*)connection->data_read_buffer) == 0) {
3059
connection->handler->debug_client_setup->dc_dump_flag = CL_TRUE;
3060
} else if (strcmp("set debug OFF",(char*)connection->data_read_buffer) == 0) {
3061
connection->handler->debug_client_setup->dc_app_log_level = 0;
3062
changed_mode = CL_TRUE;
3063
} else if (strcmp("set debug ERROR",(char*)connection->data_read_buffer) == 0) {
3064
connection->handler->debug_client_setup->dc_app_log_level = 1;
3065
changed_mode = CL_TRUE;
3066
} else if (strcmp("set debug WARNING",(char*)connection->data_read_buffer) == 0) {
3067
connection->handler->debug_client_setup->dc_app_log_level = 2;
3068
changed_mode = CL_TRUE;
3069
} else if (strcmp("set debug INFO",(char*)connection->data_read_buffer) == 0) {
3070
connection->handler->debug_client_setup->dc_app_log_level = 3;
3071
changed_mode = CL_TRUE;
3072
} else if (strcmp("set debug DEBUG",(char*)connection->data_read_buffer) == 0) {
3073
connection->handler->debug_client_setup->dc_app_log_level = 4;
3074
changed_mode = CL_TRUE;
3075
} else if (strcmp("set debug DPRINTF",(char*)connection->data_read_buffer) == 0) {
3076
connection->handler->debug_client_setup->dc_app_log_level = 5;
3077
changed_mode = CL_TRUE;
3080
if (changed_mode == CL_TRUE) {
3081
switch(connection->handler->debug_client_setup->dc_mode) {
3082
case CL_DEBUG_CLIENT_MSG:
3083
case CL_DEBUG_CLIENT_OFF: {
3084
pthread_mutex_lock(&cl_com_debug_client_callback_func_mutex);
3085
if (cl_com_debug_client_callback_func != NULL) {
3086
cl_com_debug_client_callback_func(0, connection->handler->debug_client_setup->dc_app_log_level);
3088
pthread_mutex_unlock(&cl_com_debug_client_callback_func_mutex);
3091
case CL_DEBUG_CLIENT_ALL:
3092
case CL_DEBUG_CLIENT_APP: {
3093
pthread_mutex_lock(&cl_com_debug_client_callback_func_mutex);
3094
if (cl_com_debug_client_callback_func != NULL) {
3095
cl_com_debug_client_callback_func(1, connection->handler->debug_client_setup->dc_app_log_level);
3097
pthread_mutex_unlock(&cl_com_debug_client_callback_func_mutex);
3102
break; /* found complete string */
3105
if (unparsed_string_start != -1) {
3107
/* ok we had a complete string, now remove unused string from buffer */
3108
for (pos = unparsed_string_start; pos < connection->data_read_buffer_pos; pos++) {
3109
connection->data_read_buffer[i++] = connection->data_read_buffer[pos];
3111
connection->data_read_buffer_pos -= unparsed_string_start;
3113
break; /* following string isn't complete */
3117
/* Touch endpoint, he is still active */
3118
if (cl_com_connection_get_connect_port(connection ,&connect_port) == CL_RETVAL_OK) {
3119
cl_endpoint_list_define_endpoint(cl_com_get_endpoint_list(),
3122
connection->auto_close_type, CL_FALSE);
3124
gettimeofday(&connection->last_transfer_time,NULL); /* set receive time */
3125
connection->statistic->bytes_received = connection->statistic->bytes_received + size;
3126
connection->statistic->real_bytes_received = connection->statistic->real_bytes_received + size;
3127
return return_value;
3129
gettimeofday(&now,NULL);
3130
connection->read_buffer_timeout_time = now.tv_sec + connection->handler->read_timeout;
3132
return_value = cl_com_read(connection, connection->data_read_buffer, connection->data_buffer_size, &size);
3134
connection->read_buffer_timeout_time = 0;
3136
if (return_value != CL_RETVAL_OK && return_value != CL_RETVAL_UNCOMPLETE_READ) {
3137
return return_value;
3140
return_value = cl_com_create_message(&message);
3141
if (return_value != CL_RETVAL_OK) {
3142
return return_value;
3145
CL_LOG_STR(CL_LOG_INFO,"received stream message from:", connection->remote->comp_host);
3146
message->message_state = CL_MS_READY; /* set message state */
3147
message->message_mat = CL_MIH_MAT_NAK; /* no acknoledge for stream messages */
3148
message->message_length = size; /* set message size */
3149
gettimeofday(&message->message_receive_time, NULL); /* set receive time */
3151
/* Touch endpoint, he is still active */
3152
if ( cl_com_connection_get_connect_port(connection ,&connect_port) == CL_RETVAL_OK) {
3153
cl_endpoint_list_define_endpoint(cl_com_get_endpoint_list(),
3156
connection->auto_close_type, CL_FALSE );
3159
/* set last transfer time of connection */
3160
memcpy(&connection->last_transfer_time, &message->message_receive_time, sizeof(struct timeval));
3162
message->message = (cl_byte_t*) malloc(sizeof(cl_byte_t) * size);
3163
if (message->message == NULL) {
3164
cl_com_free_message(&message);
3165
return CL_RETVAL_MALLOC;
3167
memcpy(message->message, connection->data_read_buffer , sizeof(cl_byte_t) * size);
3168
return_value = cl_message_list_append_receive(connection, message, 1);
3169
if (return_value == CL_RETVAL_OK) {
3170
if (connection->handler != NULL) {
3171
cl_com_handle_t* handle = connection->handler;
3172
/* increase counter for ready messages */
3173
pthread_mutex_lock(handle->messages_ready_mutex);
3174
handle->messages_ready_for_read = handle->messages_ready_for_read + 1;
3175
cl_app_message_queue_append(handle->received_message_queue, connection, NULL, CL_MIH_MAT_UNDEFINED, NULL, 0,0,0,1);
3176
pthread_mutex_unlock(handle->messages_ready_mutex);
3178
cl_thread_trigger_thread_condition(handle->app_condition, 1);
3182
connection->statistic->bytes_received = connection->statistic->bytes_received + size;
3183
connection->statistic->real_bytes_received = connection->statistic->real_bytes_received + size;
3184
return return_value;
3186
} else if (connection->data_flow_type == CL_CM_CT_MESSAGE) {
3188
cl_raw_list_lock(connection->received_message_list);
3191
/* try to find actual message */
3192
message_list_element = cl_message_list_get_least_elem(connection->received_message_list);
3193
if (message_list_element != NULL) {
3194
message = message_list_element->message;
3195
if (message->message_state == CL_MS_READY || message->message_state == CL_MS_PROTOCOL) {
3196
message = NULL; /* already read, create new message */
3200
if (message == NULL) {
3201
/* create a new message */
3202
return_value = cl_com_create_message(&message);
3203
if (return_value != CL_RETVAL_OK) {
3204
cl_raw_list_unlock(connection->received_message_list);
3205
CL_LOG(CL_LOG_ERROR,"error creating new empty read message");
3206
return return_value;
3208
message->message_state = CL_MS_INIT_RCV;
3209
return_value = cl_message_list_append_receive(connection, message, 0);
3210
if (return_value != CL_RETVAL_OK) {
3211
cl_raw_list_unlock(connection->received_message_list);
3212
CL_LOG(CL_LOG_ERROR,"error appending new empty read message");
3213
return return_value;
3216
CL_LOG(CL_LOG_INFO, "continue previous read for message ...");
3219
if (message->message_state == CL_MS_INIT_RCV) {
3221
CL_LOG(CL_LOG_INFO,"CL_MS_INIT_RCV");
3222
connection->data_read_buffer_pos = 0;
3223
connection->data_read_buffer_processed = 0;
3224
connection->read_gmsh_header->dl = 0;
3225
gettimeofday(&now,NULL);
3226
connection->read_buffer_timeout_time = now.tv_sec + connection->handler->read_timeout;
3227
message->message_rcv_pointer = 0;
3228
message->message_state = CL_MS_RCV_GMSH;
3231
if (message->message_state == CL_MS_RCV_GMSH) {
3232
CL_LOG(CL_LOG_INFO,"CL_MS_RCV_GMSH");
3234
return_value = cl_com_read_GMSH(connection, &size);
3235
if (return_value != CL_RETVAL_OK ) {
3236
/* header is not complete, try later */
3237
cl_raw_list_unlock(connection->received_message_list);
3238
CL_LOG_STR(CL_LOG_INFO,"cl_com_read_GMSH returned", cl_get_error_text(return_value));
3240
/* recalculate timeout when some data was received */
3242
gettimeofday(&now,NULL);
3243
connection->read_buffer_timeout_time = now.tv_sec + connection->handler->read_timeout;
3244
CL_LOG(CL_LOG_INFO,"recalculate read buffer timeout time (CL_MS_RCV_GMSH)");
3246
return return_value;
3248
connection->statistic->real_bytes_received = connection->statistic->real_bytes_received + connection->data_read_buffer_pos;
3249
connection->data_read_buffer_pos = 0;
3250
connection->data_read_buffer_processed = 0;
3251
message->message_state = CL_MS_RCV_MIH;
3254
if (message->message_state == CL_MS_RCV_MIH) {
3255
cl_com_MIH_t* mih_message = NULL;
3256
CL_LOG(CL_LOG_INFO,"CL_MS_RCV_MIH");
3257
size = connection->read_gmsh_header->dl - (connection->data_read_buffer_pos - connection->data_read_buffer_processed);
3258
if ( (size + connection->data_read_buffer_pos) >= connection->data_buffer_size ) {
3259
CL_LOG(CL_LOG_ERROR,"stream buffer to small");
3260
return CL_RETVAL_STREAM_BUFFER_OVERFLOW;
3263
unsigned long data_read = 0;
3264
return_value = cl_com_read(connection, &(connection->data_read_buffer[(connection->data_read_buffer_pos)]), size, &data_read);
3266
connection->data_read_buffer_pos = connection->data_read_buffer_pos + data_read;
3267
if (return_value != CL_RETVAL_OK) {
3268
cl_raw_list_unlock(connection->received_message_list);
3269
CL_LOG_STR(CL_LOG_INFO,"cl_com_read returned:", cl_get_error_text(return_value));
3271
/* recalculate timeout when some data was received */
3272
if (data_read > 0) {
3273
gettimeofday(&now,NULL);
3274
connection->read_buffer_timeout_time = now.tv_sec + connection->handler->read_timeout;
3275
CL_LOG(CL_LOG_INFO,"recalculate read buffer timeout time (CL_MS_RCV_MIH)");
3277
return return_value;
3280
connection->statistic->real_bytes_received = connection->statistic->real_bytes_received + connection->data_read_buffer_pos;
3281
return_value = cl_xml_parse_MIH(&(connection->data_read_buffer[(connection->data_read_buffer_processed)]),connection->read_gmsh_header->dl, &mih_message);
3283
if (return_value != CL_RETVAL_OK ) {
3284
cl_raw_list_unlock(connection->received_message_list);
3285
CL_LOG_STR(CL_LOG_INFO,"cl_xml_parse_MIH returned", cl_get_error_text(return_value));
3286
return return_value;
3289
/* TODO: check version */
3290
message->message_id = mih_message->mid;
3291
message->message_length = mih_message->dl;
3292
message->message_df = mih_message->df;
3293
message->message_mat = mih_message->mat;
3294
message->message_tag = mih_message->tag;
3295
message->message_response_id = mih_message->rid;
3296
cl_com_free_mih_message(&mih_message);
3297
message->message = (cl_byte_t*) malloc( sizeof(cl_byte_t) * message->message_length);
3298
if (message->message == NULL) {
3299
cl_raw_list_unlock(connection->received_message_list);
3300
return CL_RETVAL_MALLOC;
3302
message->message_state = CL_MS_RCV;
3305
if (message->message_state == CL_MS_RCV) {
3306
CL_LOG(CL_LOG_INFO,"CL_MS_RCV");
3309
/* is message already complete received ? */
3310
if (message->message_rcv_pointer < message->message_length) {
3311
return_value = cl_com_read(connection,
3312
&(message->message[message->message_rcv_pointer]),
3313
message->message_length - message->message_rcv_pointer,
3315
message->message_rcv_pointer = message->message_rcv_pointer + size;
3316
if (return_value != CL_RETVAL_OK) {
3317
cl_raw_list_unlock(connection->received_message_list);
3318
CL_LOG_STR(CL_LOG_INFO,"cl_com_read returned:", cl_get_error_text(return_value));
3320
/* recalculate timeout when some data was received */
3322
gettimeofday(&now,NULL);
3323
connection->read_buffer_timeout_time = now.tv_sec + connection->handler->read_timeout;
3324
CL_LOG(CL_LOG_INFO,"recalculate read buffer timeout time (CL_MS_RCV)");
3327
return return_value;
3331
CL_LOG_STR_STR_INT(CL_LOG_INFO, "received message from:", connection->remote->comp_host,
3332
connection->remote->comp_name,
3333
(int)connection->remote->comp_id);
3335
gettimeofday(&message->message_receive_time,NULL);
3336
/* set last transfer time of connection */
3337
memcpy(&connection->last_transfer_time,&message->message_receive_time,sizeof (struct timeval));
3340
connection->read_buffer_timeout_time = 0;
3342
connection->statistic->bytes_received = connection->statistic->bytes_received + message->message_length;
3343
connection->statistic->real_bytes_received = connection->statistic->real_bytes_received + message->message_length;
3345
connection->last_received_message_id = message->message_id;
3347
if (connection->connection_state == CL_CONNECTED) {
3348
int connect_port = 0;
3349
/* Touch endpoint, he is still active */
3350
if (cl_com_connection_get_connect_port(connection,&connect_port) == CL_RETVAL_OK) {
3351
return_value = cl_endpoint_list_define_endpoint(cl_com_get_endpoint_list(),
3354
connection->auto_close_type,
3359
switch(message->message_df) {
3361
CL_LOG(CL_LOG_INFO,"received binary message");
3362
message->message_state = CL_MS_READY;
3364
cl_com_add_debug_message(connection, NULL, message);
3365
/* we have a new message for the read queue */
3366
new_message_for_queue = CL_TRUE;
3369
CL_LOG(CL_LOG_INFO,"received XML message");
3370
message->message_state = CL_MS_READY;
3372
cl_com_add_debug_message(connection, NULL, message);
3373
/* we have a new message for the read queue */
3374
new_message_for_queue = CL_TRUE;
3377
CL_LOG(CL_LOG_INFO,"received protocol message");
3378
message->message_state = CL_MS_PROTOCOL;
3383
#ifdef CL_DO_SEND_ACK_AT_COMMLIB_LAYER
3384
/* send a message acknowledge when message arives at commlib layer */
3385
if (message->message_state == CL_MS_READY) {
3386
/* send acknowledge for CL_MIH_MAT_ACK type (application removed message from buffer) */
3387
if ( message->message_mat == CL_MIH_MAT_ACK) {
3388
cl_commlib_send_ack_message(connection, message);
3391
#endif /* CL_DO_SEND_ACK_AT_COMMLIB_LAYER */
3393
if (message->message_state == CL_MS_PROTOCOL) {
3394
cl_com_AM_t* ack_message = NULL;
3395
cl_com_SIM_t* sim_message = NULL;
3396
cl_com_SIRM_t* sirm_message = NULL;
3397
cl_com_CCM_t* ccm_message = NULL;
3398
cl_com_CCRM_t* ccrm_message = NULL;
3399
cl_com_message_t* sent_message = NULL;
3400
cl_message_list_elem_t* message_list_elem = NULL;
3401
unsigned long run_time = 0;
3403
switch(message->message_df) {
3404
case CL_MIH_DF_SIM: {
3405
return_value = cl_message_list_remove_receive(connection, message, 0);
3406
cl_raw_list_unlock(connection->received_message_list);
3407
if (return_value != CL_RETVAL_OK) {
3408
return return_value;
3410
cl_com_add_debug_message(connection, NULL, message);
3412
return_value = cl_xml_parse_SIM((unsigned char*)message->message, message->message_length, &sim_message);
3413
if (return_value != CL_RETVAL_OK) {
3414
cl_com_free_message(&message);
3415
return return_value;
3417
CL_LOG(CL_LOG_INFO,"received SIM message");
3419
if (connection->handler != NULL) {
3420
cl_com_handle_t* handle = connection->handler;
3421
char* application_info = "not available";
3423
/* we force an statistic update */
3424
cl_commlib_calculate_statistic(handle,CL_TRUE,0);
3425
if ( handle->statistic->application_info != NULL ) {
3426
application_info = handle->statistic->application_info;
3429
gettimeofday(&now,NULL);
3430
run_time = (unsigned long)now.tv_sec - handle->start_time.tv_sec;
3433
cl_commlib_send_sirm_message( connection,
3435
(unsigned long) handle->start_time.tv_sec,
3437
handle->statistic->unread_message_count,
3438
handle->statistic->unsend_message_count,
3439
handle->statistic->nr_of_connections,
3440
handle->statistic->application_status,
3443
cl_commlib_send_sirm_message(connection, message, 0, 0 ,0 ,0 , 0, 0, "get status info error");
3445
cl_com_free_message(&message);
3446
cl_com_free_sim_message(&sim_message);
3447
return CL_RETVAL_OK;
3449
case CL_MIH_DF_SIRM: {
3450
return_value = cl_message_list_remove_receive(connection, message, 0);
3451
cl_raw_list_unlock(connection->received_message_list);
3452
if (return_value != CL_RETVAL_OK) {
3453
return return_value;
3455
cl_com_add_debug_message(connection, NULL, message);
3457
return_value = cl_xml_parse_SIRM((unsigned char*)message->message, message->message_length, &sirm_message);
3458
if (return_value != CL_RETVAL_OK) {
3459
cl_com_free_message(&message);
3460
return return_value;
3462
CL_LOG_INT(CL_LOG_INFO,"received SIRM message for sent SIM message with id:", (int)sirm_message->mid);
3464
/* set sirm flag for sent SIM message with ack id from sirm */
3465
cl_raw_list_lock(connection->send_message_list);
3466
message_list_elem = cl_message_list_get_first_elem(connection->send_message_list);
3467
sent_message = NULL;
3468
while (message_list_elem != NULL) {
3469
sent_message = message_list_elem->message;
3470
if (sent_message->message_state == CL_MS_PROTOCOL) {
3471
if (sent_message->message_id == sirm_message->mid) {
3475
message_list_elem = cl_message_list_get_next_elem(message_list_elem);
3477
if (sent_message == NULL) {
3478
CL_LOG_INT(CL_LOG_ERROR,"got SIRM message for unexpected message id",(int)sirm_message->mid);
3479
cl_com_free_sirm_message(&sirm_message);
3481
CL_LOG_INT(CL_LOG_INFO,"got SIRM message for SIM message id",(int)sirm_message->mid);
3482
sent_message->message_sirm = sirm_message;
3484
cl_raw_list_unlock(connection->send_message_list);
3486
cl_com_free_message(&message);
3487
return CL_RETVAL_OK;
3489
case CL_MIH_DF_AM: {
3490
return_value = cl_message_list_remove_receive(connection, message, 0);
3491
cl_raw_list_unlock(connection->received_message_list);
3492
if (return_value != CL_RETVAL_OK) {
3493
return return_value;
3495
cl_com_add_debug_message(connection, NULL, message);
3497
return_value = cl_xml_parse_AM((unsigned char*)message->message, message->message_length, &ack_message);
3498
if (return_value != CL_RETVAL_OK) {
3499
cl_com_free_message(&message);
3500
return return_value;
3502
CL_LOG_INT(CL_LOG_INFO,"received ACK message. Id of ACK message:", (int)message->message_id);
3504
/* set ack flag for sent message with ack id from send_message_list */
3505
cl_raw_list_lock(connection->send_message_list);
3506
message_list_elem = cl_message_list_get_first_elem(connection->send_message_list);
3507
sent_message = NULL;
3508
while (message_list_elem != NULL) {
3509
sent_message = message_list_elem->message;
3510
if (sent_message->message_state == CL_MS_PROTOCOL) {
3511
if (sent_message->message_id == ack_message->mid) {
3515
message_list_elem = cl_message_list_get_next_elem(message_list_elem);
3517
if (sent_message == NULL) {
3518
CL_LOG_INT(CL_LOG_ERROR,"got ACK message for unexpected message id",(int)ack_message->mid);
3520
CL_LOG_INT(CL_LOG_INFO,"got ACK message for message id",(int)ack_message->mid);
3521
sent_message->message_ack_flag = 1;
3523
cl_raw_list_unlock(connection->send_message_list);
3526
cl_com_free_message(&message);
3527
cl_com_free_am_message(&ack_message);
3528
return CL_RETVAL_OK;
3530
case CL_MIH_DF_CCM: {
3531
return_value = cl_message_list_remove_receive(connection, message,0);
3532
cl_raw_list_unlock(connection->received_message_list);
3533
if (return_value != CL_RETVAL_OK) {
3534
return return_value;
3536
cl_com_add_debug_message(connection, NULL, message);
3538
return_value = cl_xml_parse_CCM((unsigned char*)message->message, message->message_length, &ccm_message);
3539
if (return_value != CL_RETVAL_OK) {
3540
cl_com_free_message(&message);
3541
return return_value;
3543
if (connection->connection_state == CL_CONNECTED) {
3544
if (connection->connection_sub_state == CL_COM_WORK) {
3545
CL_LOG(CL_LOG_INFO,"received connection close message");
3546
connection->ccm_received = 1;
3548
if ( connection->was_accepted == CL_TRUE) {
3549
CL_LOG(CL_LOG_ERROR,"received connection close message from client while sending ccm message - ignoring");
3551
CL_LOG(CL_LOG_INFO,"received connection close message");
3552
connection->ccm_received = 1;
3556
cl_com_free_message(&message);
3557
cl_com_free_ccm_message(&ccm_message);
3558
return CL_RETVAL_OK;
3560
case CL_MIH_DF_CCRM: {
3561
return_value = cl_message_list_remove_receive(connection, message, 0);
3562
cl_raw_list_unlock(connection->received_message_list);
3563
if (return_value != CL_RETVAL_OK) {
3564
return return_value;
3566
cl_com_add_debug_message(connection, NULL, message);
3568
return_value = cl_xml_parse_CCRM((unsigned char*)message->message, message->message_length, &ccrm_message);
3569
if (return_value != CL_RETVAL_OK) {
3570
cl_com_free_message(&message);
3571
return return_value;
3573
if (connection->connection_state == CL_CONNECTED) {
3574
CL_LOG(CL_LOG_INFO,"received connection close response message");
3575
connection->ccrm_received = 1;
3576
connection->connection_sub_state = CL_COM_DONE; /* CLOSE message */
3577
connection->data_write_flag = CL_COM_DATA_NOT_READY;
3580
cl_com_free_message(&message);
3581
cl_com_free_ccrm_message(&ccrm_message);
3582
return CL_RETVAL_OK;
3585
CL_LOG(CL_LOG_ERROR,"received unsupported protocol message");
3586
return_value = cl_message_list_remove_receive(connection, message, 0);
3587
if (return_value == CL_RETVAL_OK) {
3588
cl_com_free_message(&message);
3590
cl_com_add_debug_message(connection, NULL, message);
3597
cl_raw_list_unlock(connection->received_message_list);
3599
if (new_message_for_queue == CL_TRUE) {
3600
if (connection->handler != NULL) {
3601
/* increase counter for ready messages */
3602
pthread_mutex_lock(connection->handler->messages_ready_mutex);
3603
connection->handler->messages_ready_for_read = connection->handler->messages_ready_for_read + 1;
3604
cl_app_message_queue_append(connection->handler->received_message_queue, connection, NULL, CL_MIH_MAT_UNDEFINED, NULL, 0,0,0,1);
3605
pthread_mutex_unlock(connection->handler->messages_ready_mutex);
3607
cl_thread_trigger_thread_condition(connection->handler->app_condition, 1);
3612
if (connection->ccm_received == 1) {
3614
CL_LOG_INT(CL_LOG_WARNING,"receive buffer:",(int)cl_raw_list_get_elem_count(connection->received_message_list) );
3615
CL_LOG_INT(CL_LOG_WARNING,"send buffer :",(int)cl_raw_list_get_elem_count(connection->send_message_list) );
3616
CL_LOG_INT(CL_LOG_WARNING,"ccm_received :",(int)connection->ccm_received);
3618
if( cl_raw_list_get_elem_count(connection->send_message_list) == 0 &&
3619
cl_raw_list_get_elem_count(connection->received_message_list) == 0 ) {
3620
connection->ccm_received = 2;
3621
connection->connection_sub_state = CL_COM_SENDING_CCRM;
3622
/* disable #if 1 if you want to test a client, that is frozen! */
3623
cl_commlib_send_ccrm_message(connection);
3624
CL_LOG(CL_LOG_WARNING,"sending ccrm");
3626
CL_LOG(CL_LOG_WARNING,"can't send ccrm, still messages in buffer");
3630
return return_value;
3634
#ifdef __CL_FUNCTION__
3635
#undef __CL_FUNCTION__
3637
#define __CL_FUNCTION__ "cl_commlib_handle_connection_ack_timeouts()"
3639
/* This function removes all messages in sent_message_list in protocol state where
3640
the ack timeout is reached (e.g. ACK messages and SIM messages)*/
3641
static int cl_commlib_handle_connection_ack_timeouts(cl_com_connection_t* connection) {
3642
cl_com_message_t* message = NULL;
3643
cl_message_list_elem_t* message_list_elem = NULL;
3644
cl_message_list_elem_t* next_message_list_elem = NULL;
3646
int return_value = CL_RETVAL_OK;
3648
if (connection == NULL) {
3649
return CL_RETVAL_PARAMS;
3653
/* stream connections have no ack messages */
3654
if (connection->data_flow_type == CL_CM_CT_STREAM) {
3655
return CL_RETVAL_OK;
3658
if (connection->data_flow_type == CL_CM_CT_MESSAGE) {
3659
long timeout_time = 0;
3660
cl_bool_t ignore_timeouts = CL_FALSE;
3663
* This code will send a sim message to the connection in order to test the connection
3664
* availability(Part 1/2).
3666
if (connection->check_endpoint_flag == CL_TRUE && connection->check_endpoint_mid == 0) {
3667
if (connection->ccm_received == 0 &&
3668
connection->connection_state == CL_CONNECTED &&
3669
connection->connection_sub_state == CL_COM_WORK) {
3670
cl_commlib_send_sim_message(connection, &(connection->check_endpoint_mid));
3671
CL_LOG(CL_LOG_WARNING, "sending sim to connection to check its availability");
3675
#ifdef CL_DO_COMMLIB_DEBUG
3676
CL_LOG(CL_LOG_INFO,"checking timeouts for ack messages");
3678
/* lock received messages list */
3679
cl_raw_list_lock(connection->send_message_list);
3681
/* get current timeout time */
3682
gettimeofday(&now,NULL);
3683
ignore_timeouts = cl_com_get_ignore_timeouts_flag();
3685
message_list_elem = cl_message_list_get_first_elem(connection->send_message_list);
3686
while(message_list_elem != NULL) {
3687
message = message_list_elem->message;
3688
next_message_list_elem = cl_message_list_get_next_elem(message_list_elem);
3689
if (message->message_state == CL_MS_PROTOCOL) {
3692
* Check out for SIRM's sent to connections when check_endpoint_flag was TRUE
3693
* This is Part 2/2 for connection alive tests.
3695
if (message->message_id == connection->check_endpoint_mid && connection->check_endpoint_mid != 0) {
3696
if (message->message_sirm != NULL) {
3697
CL_LOG(CL_LOG_INFO,"got sirm from checked connection");
3698
cl_message_list_remove_send(connection, message, 0);
3699
CL_LOG_INT(CL_LOG_INFO,"endpoint runtime:", (int)message->message_sirm->runtime );
3700
if (message->message_sirm->info != NULL) {
3701
CL_LOG_STR(CL_LOG_INFO,"endpoint info: ", message->message_sirm->info);
3703
cl_com_free_message(&message);
3704
connection->check_endpoint_mid = 0;
3705
connection->check_endpoint_flag = CL_FALSE; /* now the next check can be done */
3706
message_list_elem = next_message_list_elem;
3710
timeout_time = (message->message_send_time).tv_sec + connection->handler->acknowledge_timeout;
3711
if ( timeout_time <= now.tv_sec ) {
3712
CL_LOG_INT(CL_LOG_ERROR,"ack timeout for message",(int) message->message_id);
3713
if (message->message_id == connection->check_endpoint_mid && connection->check_endpoint_mid != 0) {
3714
connection->check_endpoint_mid = 0;
3715
connection->check_endpoint_flag = CL_FALSE; /* now the next check can be done */
3717
cl_message_list_remove_send(connection, message, 0);
3718
cl_com_free_message(&message);
3720
if ( ignore_timeouts == CL_TRUE) {
3721
if ( connection->connection_state == CL_CONNECTED &&
3722
connection->connection_sub_state == CL_COM_WORK ) {
3723
CL_LOG(CL_LOG_INFO,"ignore ack timeout flag is set, but this connection is connected and waiting for ack - continue waiting");
3725
CL_LOG(CL_LOG_INFO,"ignore ack timeout flag is set and connection is not connected - ignore timeout");
3726
if (message->message_id == connection->check_endpoint_mid && connection->check_endpoint_mid != 0) {
3727
connection->check_endpoint_mid = 0;
3728
connection->check_endpoint_flag = CL_FALSE; /* now the next check can be done */
3730
cl_message_list_remove_send(connection, message, 0);
3731
cl_com_free_message(&message);
3736
message_list_elem = next_message_list_elem;
3738
/* unlock received messages list */
3739
cl_raw_list_unlock(connection->send_message_list);
3741
return return_value;
3745
#ifdef __CL_FUNCTION__
3746
#undef __CL_FUNCTION__
3748
#define __CL_FUNCTION__ "cl_commlib_check_connection_count()"
3749
static int cl_commlib_check_connection_count(cl_com_handle_t* handle) {
3750
cl_connection_list_elem_t* elem = NULL;
3752
if (handle == NULL) {
3753
return CL_RETVAL_PARAMS;
3756
/* check max open connections */
3757
if (handle->max_con_close_mode != CL_ON_MAX_COUNT_OFF) {
3759
/* first lock the connection list */
3760
cl_raw_list_lock(handle->connection_list);
3762
/* check if we exceed the max. connection count */
3763
if ( cl_raw_list_get_elem_count(handle->connection_list) >= handle->max_open_connections ) {
3765
/* we have reached max connection count, set flag */
3766
if ( handle->max_connection_count_reached == CL_FALSE) {
3767
handle->max_connection_count_reached = CL_TRUE;
3768
CL_LOG(CL_LOG_ERROR,"max open connection count reached");
3771
/* still haven't found a connection to close */
3772
if ( handle->max_connection_count_found_connection_to_close == CL_FALSE ) {
3773
cl_com_connection_t* oldest_connection = NULL;
3775
if ( handle->max_con_close_mode == CL_ON_MAX_COUNT_CLOSE_AUTOCLOSE_CLIENTS ) {
3776
/* try to find the oldest connected connection of type CL_CM_CT_MESSAGE */
3777
elem = cl_connection_list_get_first_elem(handle->connection_list);
3779
if (elem->connection->data_flow_type == CL_CM_CT_MESSAGE &&
3780
elem->connection->connection_state == CL_CONNECTED &&
3781
elem->connection->connection_sub_state == CL_COM_WORK &&
3782
elem->connection->auto_close_type == CL_CM_AC_ENABLED &&
3783
elem->connection != handle->last_receive_message_connection ) {
3785
/* we haven't selected an elem till now, take the first one */
3786
if (oldest_connection == NULL) {
3787
oldest_connection = elem->connection;
3789
/* check if the actual elem is older than oldest_connection */
3790
if (elem->connection->last_transfer_time.tv_sec < oldest_connection->last_transfer_time.tv_sec) {
3791
oldest_connection = elem->connection;
3793
if (elem->connection->last_transfer_time.tv_sec == oldest_connection->last_transfer_time.tv_sec) {
3794
if (elem->connection->last_transfer_time.tv_usec < oldest_connection->last_transfer_time.tv_usec) {
3795
oldest_connection = elem->connection;
3801
elem = cl_connection_list_get_next_elem(elem);
3805
/* close connection ( if found ) */
3806
if ( oldest_connection != NULL ) {
3807
cl_commlib_send_ccm_message(oldest_connection);
3808
oldest_connection->connection_sub_state = CL_COM_SENDING_CCM;
3809
handle->max_connection_count_found_connection_to_close = CL_TRUE;
3810
CL_LOG_STR(CL_LOG_WARNING,"closing connection to host:", oldest_connection->remote->comp_host );
3811
CL_LOG_STR(CL_LOG_WARNING,"component name: ", oldest_connection->remote->comp_name );
3812
CL_LOG_INT(CL_LOG_WARNING,"component id: ", (int)oldest_connection->remote->comp_id );
3814
CL_LOG(CL_LOG_WARNING,"can't close any connection");
3815
handle->max_connection_count_found_connection_to_close = CL_FALSE;
3819
/* we have found a connection to close, check if closing is still in progress */
3820
if ( handle->max_connection_count_found_connection_to_close == CL_TRUE ) {
3821
int is_still_in_list = 0;
3822
elem = cl_connection_list_get_first_elem(handle->connection_list);
3824
if (elem->connection->data_flow_type == CL_CM_CT_MESSAGE &&
3825
elem->connection->connection_state == CL_CONNECTED &&
3826
elem->connection->connection_sub_state != CL_COM_WORK ) {
3827
CL_LOG_STR(CL_LOG_WARNING,"processing close of connection to host:", elem->connection->remote->comp_host );
3828
CL_LOG_STR(CL_LOG_WARNING,"component name: ", elem->connection->remote->comp_name );
3829
CL_LOG_INT(CL_LOG_WARNING,"component id: ", (int)elem->connection->remote->comp_id );
3830
is_still_in_list = 1;
3833
elem = cl_connection_list_get_next_elem(elem);
3835
if ( is_still_in_list == 0) {
3836
handle->max_connection_count_found_connection_to_close = CL_FALSE;
3838
CL_LOG(CL_LOG_WARNING,"still waiting for closing of connection");
3842
/* we have enough free connections, check if connection count reached flag is set */
3843
if (handle->max_connection_count_reached == CL_TRUE) {
3844
handle->max_connection_count_reached = CL_FALSE;
3845
handle->max_connection_count_found_connection_to_close = CL_FALSE;
3846
CL_LOG(CL_LOG_ERROR,"new connections enabled again");
3849
cl_raw_list_unlock(handle->connection_list);
3851
return CL_RETVAL_OK;
3853
#ifdef __CL_FUNCTION__
3854
#undef __CL_FUNCTION__
3856
#define __CL_FUNCTION__ "cl_commlib_handle_debug_clients()"
3857
static int cl_commlib_handle_debug_clients(cl_com_handle_t* handle, cl_bool_t lock_list) {
3859
cl_connection_list_elem_t* elem = NULL;
3860
cl_string_list_elem_t* string_elem = NULL;
3861
char* log_string = NULL;
3862
cl_bool_t list_empty = CL_FALSE;
3863
cl_bool_t flushed_client = CL_FALSE;
3864
cl_bool_t had_data_to_flush = CL_FALSE;
3867
if (handle == NULL) {
3868
CL_LOG(CL_LOG_ERROR,"no handle specified");
3869
return CL_RETVAL_PARAMS;
3872
if (handle->debug_client_setup->dc_mode == CL_DEBUG_CLIENT_OFF) {
3873
CL_LOG(CL_LOG_INFO,"debug clients not enabled");
3874
return CL_RETVAL_DEBUG_CLIENTS_NOT_ENABLED;
3877
if (handle->debug_client_setup->dc_debug_list == NULL) {
3878
CL_LOG(CL_LOG_INFO,"debug clients not supported");
3879
return CL_RETVAL_UNKNOWN;
3882
if (lock_list == CL_TRUE) {
3883
cl_raw_list_lock(handle->connection_list);
3886
cl_raw_list_lock(handle->debug_client_setup->dc_debug_list);
3887
CL_LOG_INT(CL_LOG_INFO, "elements to flush:", (int)cl_raw_list_get_elem_count(handle->debug_client_setup->dc_debug_list));
3888
cl_raw_list_unlock(handle->debug_client_setup->dc_debug_list);
3890
while(list_empty == CL_FALSE) {
3892
cl_raw_list_lock(handle->debug_client_setup->dc_debug_list);
3893
string_elem = cl_string_list_get_first_elem(handle->debug_client_setup->dc_debug_list);
3894
if (string_elem != NULL) {
3895
cl_raw_list_remove_elem(handle->debug_client_setup->dc_debug_list, string_elem->raw_elem);
3896
log_string = string_elem->string;
3897
had_data_to_flush = CL_TRUE;
3900
list_empty = CL_TRUE;
3902
cl_raw_list_unlock(handle->debug_client_setup->dc_debug_list);
3904
if (log_string != NULL) {
3905
elem = cl_connection_list_get_first_elem(handle->connection_list);
3907
cl_com_connection_t* connection = elem->connection;
3908
if (connection->data_flow_type == CL_CM_CT_STREAM && connection->connection_state == CL_CONNECTED) {
3909
if (strcmp(connection->remote->comp_name, CL_COM_DEBUG_CLIENT_NAME) == 0) {
3910
cl_com_message_t* message = NULL;
3911
char* message_text = strdup(log_string);
3913
/* flush debug client */
3914
if (message_text != NULL) {
3915
CL_LOG_STR_STR_INT(CL_LOG_INFO, "flushing debug client:",
3916
connection->remote->comp_host,
3917
connection->remote->comp_name,
3918
(int)connection->remote->comp_id);
3920
cl_raw_list_lock(connection->send_message_list);
3922
cl_com_setup_message(&message,
3924
(cl_byte_t*) message_text,
3925
strlen(message_text),
3929
cl_message_list_append_send(connection,message,0);
3930
cl_raw_list_unlock(connection->send_message_list);
3931
flushed_client = CL_TRUE;
3935
elem = cl_connection_list_get_next_elem(elem);
3942
if ( had_data_to_flush == CL_TRUE && flushed_client == CL_FALSE ) {
3943
/* no connected debug clients, turn off debug message flushing */
3944
CL_LOG(CL_LOG_ERROR,"disable debug client message creation");
3945
handle->debug_client_setup->dc_mode = CL_DEBUG_CLIENT_OFF;
3946
pthread_mutex_lock(&cl_com_debug_client_callback_func_mutex);
3947
if (cl_com_debug_client_callback_func != NULL) {
3948
cl_com_debug_client_callback_func(0, handle->debug_client_setup->dc_app_log_level);
3950
pthread_mutex_unlock(&cl_com_debug_client_callback_func_mutex);
3953
if (lock_list == CL_TRUE) {
3954
cl_raw_list_unlock(handle->connection_list);
3957
if ( flushed_client == CL_TRUE ) {
3958
switch(cl_com_create_threads) {
3960
CL_LOG(CL_LOG_INFO,"no threads enabled");
3961
/* we just want to trigger write , no wait for read*/
3962
cl_commlib_trigger(handle, 1);
3965
/* we just want to trigger write , no wait for read*/
3966
CL_LOG(CL_LOG_INFO,"trigger write thread");
3967
cl_thread_trigger_event(handle->write_thread);
3971
return CL_RETVAL_OK;
3974
#ifdef __CL_FUNCTION__
3975
#undef __CL_FUNCTION__
3977
#define __CL_FUNCTION__ "cl_com_get_actual_statistic_data()"
3978
int cl_com_get_actual_statistic_data(cl_com_handle_t* handle, cl_com_handle_statistic_t** statistics) {
3979
int ret_val = CL_RETVAL_OK;
3980
if (handle == NULL || statistics == NULL || *statistics != NULL) {
3981
return CL_RETVAL_PARAMS;
3984
*statistics = (cl_com_handle_statistic_t*)malloc(sizeof(cl_com_handle_statistic_t));
3985
if (*statistics == NULL) {
3986
return CL_RETVAL_MALLOC;
3989
cl_raw_list_lock(handle->connection_list);
3990
if ( (ret_val=cl_commlib_calculate_statistic(handle, CL_TRUE, 0)) == CL_RETVAL_OK) {
3991
memcpy(*statistics, handle->statistic, sizeof(cl_com_handle_statistic_t));
3992
(*statistics)->application_info = NULL;
3993
if (handle->statistic->application_info != NULL) {
3994
(*statistics)->application_info = strdup(handle->statistic->application_info);
3996
(*statistics)->application_info = strdup("not available");
3999
cl_raw_list_unlock(handle->connection_list);
4001
if ((*statistics)->application_info == NULL) {
4002
cl_com_free_handle_statistic(statistics);
4003
return CL_RETVAL_MALLOC;
4009
#ifdef __CL_FUNCTION__
4010
#undef __CL_FUNCTION__
4012
#define __CL_FUNCTION__ "cl_com_set_application_debug_client_callback_func()"
4013
int cl_com_set_application_debug_client_callback_func(cl_app_debug_client_func_t debug_client_callback_func ) {
4014
pthread_mutex_lock(&cl_com_debug_client_callback_func_mutex);
4015
cl_com_debug_client_callback_func = *debug_client_callback_func;
4016
pthread_mutex_unlock(&cl_com_debug_client_callback_func_mutex);
4017
return CL_RETVAL_OK;
4021
#ifdef __CL_FUNCTION__
4022
#undef __CL_FUNCTION__
4024
#define __CL_FUNCTION__ "cl_com_default_application_debug_client_callback()"
4025
static void cl_com_default_application_debug_client_callback(int dc_connected, int debug_level) {
4026
if (dc_connected == 1) {
4027
CL_LOG(CL_LOG_INFO,"a application debug client is connected");
4029
CL_LOG(CL_LOG_INFO,"no application debug client connected");
4032
CL_LOG_INT(CL_LOG_INFO,"debug level is:", debug_level);
4035
#ifdef __CL_FUNCTION__
4036
#undef __CL_FUNCTION__
4038
#define __CL_FUNCTION__ "cl_com_application_debug()"
4039
int cl_com_application_debug(cl_com_handle_t* handle, const char* message) {
4040
#define CL_DEBUG_DMT_APP_MESSAGE_FORMAT_STRING "%lu\t%.6f\t%s\n"
4041
int ret_val = CL_RETVAL_OK;
4042
double time_now = 0.0;
4043
char* dm_buffer = NULL;
4044
unsigned long dm_buffer_len = 0;
4045
cl_com_debug_message_tag_t debug_message_tag = CL_DMT_APP_MESSAGE;
4048
if (handle == NULL || message == NULL) {
4049
return CL_RETVAL_PARAMS;
4052
/* don't add default case for this switch! */
4053
switch(handle->debug_client_setup->dc_mode) {
4054
case CL_DEBUG_CLIENT_OFF:
4055
case CL_DEBUG_CLIENT_MSG:
4056
return CL_RETVAL_DEBUG_CLIENTS_NOT_ENABLED;
4057
case CL_DEBUG_CLIENT_APP:
4058
case CL_DEBUG_CLIENT_ALL:
4062
gettimeofday(&now,NULL);
4063
time_now = now.tv_sec + (now.tv_usec / 1000000.0);
4065
dm_buffer_len += cl_util_get_ulong_number_length((unsigned long)debug_message_tag);
4066
dm_buffer_len += cl_util_get_double_number_length(time_now);
4067
dm_buffer_len += strlen(message);
4068
dm_buffer_len += strlen(CL_DEBUG_DMT_APP_MESSAGE_FORMAT_STRING);
4071
dm_buffer = (char*) malloc(sizeof(char)*dm_buffer_len);
4072
if (dm_buffer == NULL) {
4073
return CL_RETVAL_MALLOC;
4077
snprintf(dm_buffer,dm_buffer_len,CL_DEBUG_DMT_APP_MESSAGE_FORMAT_STRING,
4078
(unsigned long)debug_message_tag,
4083
* remove all "\n" execpt the last one, because any additional "\n" in application message
4084
* would break qping message format parsing.
4086
for(i=dm_buffer_len - 1 ; i > 0 ; i--) {
4087
if ( dm_buffer[i] == '\n' ) {
4088
if (found_last == 1) {
4094
ret_val = cl_string_list_append_string(handle->debug_client_setup->dc_debug_list, dm_buffer , 1);
4103
#ifdef __CL_FUNCTION__
4104
#undef __CL_FUNCTION__
4106
#define __CL_FUNCTION__ "cl_commlib_calculate_statistic()"
4107
static int cl_commlib_calculate_statistic(cl_com_handle_t* handle, cl_bool_t force_update ,int lock_list) {
4108
cl_connection_list_elem_t* elem = NULL;
4110
double handle_time_last = 0.0;
4111
double handle_time_now = 0.0;
4112
double handle_time_range = 0.0;
4113
double con_per_second = 0.0;
4114
double kbits_sent = 0.0;
4115
double kbits_received = 0.0;
4116
double real_kbits_sent = 0.0;
4117
double real_kbits_received = 0.0;
4118
double send_pay_load = 0.0;
4119
double receive_pay_load = 0.0;
4122
cl_com_con_statistic_t* con_stat = NULL;
4123
if (handle == NULL) {
4124
CL_LOG(CL_LOG_ERROR,"no handle specified");
4125
return CL_RETVAL_PARAMS;
4128
if (lock_list != 0) {
4129
cl_raw_list_lock(handle->connection_list);
4131
gettimeofday(&now,NULL);
4133
handle_time_now = now.tv_sec + (now.tv_usec / 1000000.0);
4134
handle_time_last = handle->statistic->last_update.tv_sec + (handle->statistic->last_update.tv_usec / 1000000.0 );
4135
handle_time_range = handle_time_now - handle_time_last;
4137
if ( force_update == CL_FALSE && handle_time_range < 60.0 ) {
4138
/* only update once per minute */
4139
CL_LOG_INT(CL_LOG_DEBUG, "skipping statistic update, time till next update:", (int) (60 - (int)handle_time_range));
4140
if (lock_list != 0) {
4141
cl_raw_list_unlock(handle->connection_list);
4143
return CL_RETVAL_OK;
4146
CL_LOG(CL_LOG_INFO, "performing statistic update");
4147
gettimeofday(&(handle->statistic->last_update),NULL);
4150
/* get application status */
4151
pthread_mutex_lock(&cl_com_application_mutex);
4152
handle->statistic->application_status = 99999;
4153
if (cl_com_application_status_func != NULL) {
4154
if ( handle->statistic->application_info != NULL ) {
4155
free(handle->statistic->application_info);
4156
handle->statistic->application_info = NULL;
4158
handle->statistic->application_status = cl_com_application_status_func(&(handle->statistic->application_info));
4160
pthread_mutex_unlock(&cl_com_application_mutex);
4162
con_per_second = handle->statistic->new_connections / handle_time_range;
4164
handle->statistic->new_connections = 0;
4165
handle->statistic->unsend_message_count = 0;
4166
handle->statistic->unread_message_count = 0;
4167
handle->statistic->nr_of_connections = cl_raw_list_get_elem_count(handle->connection_list);
4168
elem = cl_connection_list_get_first_elem(handle->connection_list);
4170
/* elem->connection */
4171
con_stat = elem->connection->statistic;
4173
handle->statistic->bytes_sent = handle->statistic->bytes_sent + con_stat->bytes_sent;
4174
handle->statistic->real_bytes_sent = handle->statistic->real_bytes_sent + con_stat->real_bytes_sent;
4175
handle->statistic->bytes_received = handle->statistic->bytes_received + con_stat->bytes_received;
4176
handle->statistic->real_bytes_received = handle->statistic->real_bytes_received + con_stat->real_bytes_received;
4177
con_stat->bytes_sent = 0;
4178
con_stat->bytes_received = 0;
4179
con_stat->real_bytes_sent = 0;
4180
con_stat->real_bytes_received = 0;
4182
handle->statistic->unsend_message_count = handle->statistic->unsend_message_count + cl_raw_list_get_elem_count(elem->connection->send_message_list);
4183
handle->statistic->unread_message_count = handle->statistic->unread_message_count + cl_raw_list_get_elem_count(elem->connection->received_message_list);
4185
elem = cl_connection_list_get_next_elem(elem);
4187
if (handle_time_range > 0.0) {
4188
kbits_sent = ( (handle->statistic->bytes_sent / 1024.0) * 8.0) / handle_time_range;
4189
kbits_received = ( (handle->statistic->bytes_received / 1024.0) * 8.0) / handle_time_range;
4190
real_kbits_sent = ( (handle->statistic->real_bytes_sent / 1024.0) * 8.0) / handle_time_range;
4191
real_kbits_received = ( (handle->statistic->real_bytes_received / 1024.0) * 8.0) / handle_time_range;
4194
if (real_kbits_sent > 0.0) {
4195
send_pay_load = kbits_sent / real_kbits_sent;
4197
if (real_kbits_received > 0.0) {
4198
receive_pay_load = kbits_received / real_kbits_received;
4202
snprintf(help,256," %.3f", handle_time_range);
4203
CL_LOG_STR(CL_LOG_INFO,"time_range:",help);
4205
snprintf(help,256," %.3f", con_per_second );
4206
CL_LOG_STR(CL_LOG_INFO,"new connections/sec:",help);
4208
snprintf(help,256," %.3f", send_pay_load);
4209
CL_LOG_STR(CL_LOG_INFO,"sent ratio:",help);
4210
snprintf(help,256," %.3f", kbits_sent);
4211
CL_LOG_STR(CL_LOG_INFO,"sent kbit/s:",help);
4212
snprintf(help,256," %.3f", real_kbits_sent);
4213
CL_LOG_STR(CL_LOG_INFO,"real sent kbit/s:",help);
4216
snprintf(help,256," %.3f", receive_pay_load);
4217
CL_LOG_STR(CL_LOG_INFO,"receive ratio:",help);
4218
snprintf(help,256," %.3f", kbits_received);
4219
CL_LOG_STR(CL_LOG_INFO,"received kbit/s:",help);
4220
snprintf(help,256," %.3f", real_kbits_received);
4221
CL_LOG_STR(CL_LOG_INFO,"real received kbit/s:",help);
4224
snprintf(help,256," %.3f" , (double) handle->statistic->bytes_sent / 1024.0);
4225
CL_LOG_STR(CL_LOG_INFO,"sent kbyte:",help);
4226
snprintf(help,256," %.3f" , (double) handle->statistic->real_bytes_sent / 1024.0);
4227
CL_LOG_STR(CL_LOG_INFO,"real sent kbyte:",help);
4231
snprintf(help,256," %.3f" , (double)handle->statistic->bytes_received / 1024.0);
4232
CL_LOG_STR(CL_LOG_INFO,"received kbyte:",help);
4233
snprintf(help,256," %.3f" , (double)handle->statistic->real_bytes_received / 1024.0);
4234
CL_LOG_STR(CL_LOG_INFO,"real received kbyte:",help);
4238
snprintf(help,256," %ld" , handle->statistic->unsend_message_count);
4239
CL_LOG_STR(CL_LOG_INFO,"unsend_message_count:",help);
4241
snprintf(help,256," %ld" , handle->statistic->unread_message_count);
4242
CL_LOG_STR(CL_LOG_INFO,"unread_message_count:",help);
4244
snprintf(help,256," %ld" , handle->statistic->nr_of_connections);
4245
CL_LOG_STR(CL_LOG_INFO,"open connections:",help);
4247
snprintf(help,256," %ld" , handle->statistic->application_status);
4248
CL_LOG_STR(CL_LOG_INFO,"application state:",help);
4250
if ( handle->statistic->application_info != NULL ) {
4251
snprintf(help,256," %s" , handle->statistic->application_info);
4252
CL_LOG_STR(CL_LOG_INFO,"application state:",help);
4255
handle->statistic->bytes_sent = 0;
4256
handle->statistic->bytes_received = 0;
4257
handle->statistic->real_bytes_sent = 0;
4258
handle->statistic->real_bytes_received = 0;
4260
if (lock_list != 0) {
4261
cl_raw_list_unlock(handle->connection_list);
4263
return CL_RETVAL_OK;
4267
#ifdef __CL_FUNCTION__
4268
#undef __CL_FUNCTION__
4270
#define __CL_FUNCTION__ "cl_commlib_finish_request_completeness()"
4271
static int cl_commlib_finish_request_completeness(cl_com_connection_t* connection) {
4272
if (connection == NULL) {
4273
return CL_RETVAL_PARAMS;
4276
/* reset buffer variables (used for STREAM debug_clients) */
4277
connection->data_write_buffer_pos = 0;
4278
connection->data_write_buffer_processed = 0;
4279
connection->data_write_buffer_to_send = 0;
4281
connection->data_read_buffer_processed = 0;
4282
connection->data_read_buffer_pos = 0;
4284
if (connection->was_accepted == CL_TRUE) {
4285
int connect_port = 0;
4286
if (cl_com_connection_get_connect_port(connection, &connect_port) == CL_RETVAL_OK) {
4287
if (connect_port > 0) {
4288
CL_LOG_STR(CL_LOG_INFO,"comp_host :", connection->remote->comp_host);
4289
CL_LOG_STR(CL_LOG_INFO,"comp_name :", connection->remote->comp_name);
4290
CL_LOG_INT(CL_LOG_INFO,"comp_id :", (int)connection->remote->comp_id);
4291
CL_LOG_INT(CL_LOG_INFO,"new connected client can be reached at port", (int)connect_port);
4292
if (connection->auto_close_type == CL_CM_AC_ENABLED) {
4293
CL_LOG(CL_LOG_INFO,"new connected client supports auto close");
4295
cl_com_append_known_endpoint(connection->remote, connect_port, connection->auto_close_type, CL_FALSE);
4298
CL_LOG(CL_LOG_INFO,"client does not provide service port");
4303
return CL_RETVAL_OK;
4308
#ifdef __CL_FUNCTION__
4309
#undef __CL_FUNCTION__
4311
#define __CL_FUNCTION__ "cl_commlib_handle_connection_write()"
4312
static int cl_commlib_handle_connection_write(cl_com_connection_t* connection) {
4313
cl_com_message_t* message = NULL;
4314
cl_message_list_elem_t* message_list_elem = NULL;
4315
cl_message_list_elem_t* next_message_list_elem = NULL;
4316
int return_value = CL_RETVAL_OK;
4318
int connect_port = 0;
4320
if (connection == NULL) {
4321
return CL_RETVAL_PARAMS;
4324
if (connection->data_flow_type == CL_CM_CT_STREAM) {
4326
cl_raw_list_lock(connection->send_message_list);
4328
message_list_elem = cl_message_list_get_first_elem(connection->send_message_list);
4329
if (message_list_elem != NULL) {
4330
message = message_list_elem->message;
4332
connection->data_write_flag = CL_COM_DATA_NOT_READY;
4333
cl_raw_list_unlock(connection->send_message_list);
4334
return CL_RETVAL_OK;
4337
if (message->message_state == CL_MS_INIT_SND) {
4338
message->message_snd_pointer = 0;
4339
message->message_state = CL_MS_SND;
4340
gettimeofday(&now,NULL);
4341
connection->write_buffer_timeout_time = now.tv_sec + connection->handler->write_timeout;
4344
if (message->message_state == CL_MS_SND) {
4345
unsigned long written = 0;
4346
return_value = cl_com_write(connection,
4347
&(message->message[message->message_snd_pointer]),
4348
message->message_length - message->message_snd_pointer,
4350
message->message_snd_pointer = message->message_snd_pointer + written;
4351
if (return_value != CL_RETVAL_OK) {
4352
cl_raw_list_unlock(connection->send_message_list);
4353
return return_value;
4355
if (message->message_snd_pointer == message->message_length) {
4356
message->message_state = CL_MS_READY;
4358
cl_raw_list_unlock(connection->send_message_list);
4359
return CL_RETVAL_SEND_ERROR;
4363
if (message->message_state == CL_MS_READY) {
4364
connection->write_buffer_timeout_time = 0;
4365
connection->statistic->bytes_sent = connection->statistic->bytes_sent + message->message_length;
4366
connection->statistic->real_bytes_sent = connection->statistic->real_bytes_sent + message->message_length;
4367
if ( cl_message_list_remove_send(connection, message, 0) == CL_RETVAL_OK) {
4368
cl_com_free_message(&message);
4371
if (cl_message_list_get_first_elem(connection->send_message_list) == NULL) {
4372
connection->data_write_flag = CL_COM_DATA_NOT_READY;
4374
connection->data_write_flag = CL_COM_DATA_READY;
4377
cl_raw_list_unlock(connection->send_message_list);
4379
/* set last transfer time of connection */
4380
gettimeofday(&connection->last_transfer_time,NULL);
4382
/* Touch endpoint, he is still active */
4383
if (cl_com_connection_get_connect_port(connection, &connect_port) == CL_RETVAL_OK) {
4384
cl_endpoint_list_define_endpoint(cl_com_get_endpoint_list(),
4387
connection->auto_close_type, CL_FALSE );
4390
} else if (connection->data_flow_type == CL_CM_CT_MESSAGE) {
4392
cl_raw_list_lock(connection->send_message_list);
4394
#ifdef CL_DO_COMMLIB_DEBUG
4395
CL_LOG_INT(CL_LOG_INFO,"number of messages in send list:",(int)cl_raw_list_get_elem_count(connection->send_message_list));
4399
message_list_elem = cl_message_list_get_first_elem(connection->send_message_list);
4400
while (message_list_elem != NULL) {
4401
if (message_list_elem->message->message_state == CL_MS_PROTOCOL ||
4402
message_list_elem->message->message_state == CL_MS_READY) {
4403
message_list_elem = cl_message_list_get_next_elem(message_list_elem);
4406
message = message_list_elem->message;
4410
if (message == NULL) {
4411
connection->data_write_flag = CL_COM_DATA_NOT_READY;
4412
cl_raw_list_unlock(connection->send_message_list);
4413
return CL_RETVAL_OK;
4416
next_message_list_elem = cl_message_list_get_next_elem(message_list_elem);
4418
if (message->message_state == CL_MS_INIT_SND) {
4419
unsigned long gmsh_message_size = 0;
4420
unsigned long mih_message_size = 0;
4422
mih_message_size = CL_MIH_MESSAGE_SIZE;
4423
mih_message_size += cl_util_get_ulong_number_length(message->message_id);
4424
mih_message_size += cl_util_get_ulong_number_length(message->message_length);
4425
mih_message_size += strlen(cl_com_get_mih_df_string(message->message_df));
4426
mih_message_size += strlen(cl_com_get_mih_mat_string(message->message_mat));
4427
mih_message_size += cl_util_get_ulong_number_length(message->message_tag);
4428
mih_message_size += cl_util_get_ulong_number_length(message->message_response_id);
4430
if (connection->data_buffer_size < (mih_message_size + 1) ) {
4431
cl_raw_list_unlock(connection->send_message_list);
4432
return CL_RETVAL_STREAM_BUFFER_OVERFLOW;
4435
gmsh_message_size = CL_GMSH_MESSAGE_SIZE + cl_util_get_ulong_number_length(mih_message_size);
4436
if (connection->data_buffer_size < (gmsh_message_size + 1) ) {
4437
cl_raw_list_unlock(connection->send_message_list);
4438
return CL_RETVAL_STREAM_BUFFER_OVERFLOW;
4440
snprintf((char*)connection->data_write_buffer, connection->data_buffer_size, CL_GMSH_MESSAGE , mih_message_size);
4442
gettimeofday(&now,NULL);
4443
connection->write_buffer_timeout_time = now.tv_sec + connection->handler->write_timeout;
4444
connection->data_write_buffer_pos = 0;
4445
connection->data_write_buffer_processed = 0;
4446
connection->data_write_buffer_to_send = gmsh_message_size;
4447
message->message_snd_pointer = 0;
4448
message->message_state = CL_MS_SND_GMSH;
4451
if (message->message_state == CL_MS_SND_GMSH) {
4452
unsigned long written = 0;
4453
return_value = cl_com_write(connection,
4454
&(connection->data_write_buffer[connection->data_write_buffer_pos]),
4455
connection->data_write_buffer_to_send,
4457
connection->data_write_buffer_pos = connection->data_write_buffer_pos + written;
4458
connection->data_write_buffer_to_send = connection->data_write_buffer_to_send - written;
4459
if (return_value != CL_RETVAL_OK) {
4460
cl_raw_list_unlock(connection->send_message_list);
4462
/* recalculate timeout when some data was written */
4464
gettimeofday(&now,NULL);
4465
connection->write_buffer_timeout_time = now.tv_sec + connection->handler->write_timeout;
4466
CL_LOG(CL_LOG_INFO,"recalculate write buffer timeout time (CL_MS_SND_GMSH)");
4468
return return_value;
4470
if (connection->data_write_buffer_to_send > 0) {
4471
cl_raw_list_unlock(connection->send_message_list);
4472
return CL_RETVAL_SEND_ERROR;
4474
connection->statistic->real_bytes_sent = connection->statistic->real_bytes_sent + connection->data_write_buffer_pos ;
4476
sprintf((char*)connection->data_write_buffer, CL_MIH_MESSAGE ,
4477
CL_MIH_MESSAGE_VERSION,
4478
message->message_id,
4479
message->message_length,
4480
cl_com_get_mih_df_string(message->message_df),
4481
cl_com_get_mih_mat_string(message->message_mat),
4482
message->message_tag ,
4483
message->message_response_id);
4485
#ifdef CL_DO_COMMLIB_DEBUG
4486
CL_LOG_STR(CL_LOG_DEBUG,"write buffer:",(char*)connection->data_write_buffer);
4487
CL_LOG_STR(CL_LOG_INFO,"CL_MS_SND_GMSH, MIH: ",(char*)connection->data_write_buffer );
4490
connection->data_write_buffer_pos = 0;
4491
connection->data_write_buffer_processed = 0;
4492
connection->data_write_buffer_to_send = strlen((char*)connection->data_write_buffer);
4494
message->message_snd_pointer = 0;
4495
message->message_state = CL_MS_SND_MIH;
4498
if (message->message_state == CL_MS_SND_MIH) {
4499
unsigned long written = 0;
4500
return_value = cl_com_write(connection,
4501
&(connection->data_write_buffer[connection->data_write_buffer_pos]),
4502
connection->data_write_buffer_to_send,
4504
connection->data_write_buffer_pos = connection->data_write_buffer_pos + written;
4505
connection->data_write_buffer_to_send = connection->data_write_buffer_to_send - written;
4506
if (return_value != CL_RETVAL_OK) {
4507
cl_raw_list_unlock(connection->send_message_list);
4509
/* recalculate timeout when some data was written */
4511
gettimeofday(&now,NULL);
4512
connection->write_buffer_timeout_time = now.tv_sec + connection->handler->write_timeout;
4513
CL_LOG(CL_LOG_INFO,"recalculate write buffer timeout time (CL_MS_SND_MIH)");
4515
return return_value;
4517
if (connection->data_write_buffer_to_send > 0) {
4518
cl_raw_list_unlock(connection->send_message_list);
4519
return CL_RETVAL_SEND_ERROR;
4521
connection->statistic->real_bytes_sent = connection->statistic->real_bytes_sent + connection->data_write_buffer_pos ;
4523
message->message_state = CL_MS_SND;
4526
if (message->message_state == CL_MS_SND) {
4527
unsigned long written = 0;
4528
return_value = cl_com_write(connection,
4529
&(message->message[message->message_snd_pointer]),
4530
message->message_length - message->message_snd_pointer,
4532
message->message_snd_pointer = message->message_snd_pointer + written;
4533
if (return_value != CL_RETVAL_OK) {
4534
cl_raw_list_unlock(connection->send_message_list);
4536
/* recalculate timeout when some data was written */
4538
gettimeofday(&now,NULL);
4539
connection->write_buffer_timeout_time = now.tv_sec + connection->handler->write_timeout;
4540
CL_LOG(CL_LOG_INFO,"recalculate write buffer timeout time (CL_MS_SND)");
4542
return return_value;
4544
if (message->message_snd_pointer == message->message_length) {
4545
message->message_state = CL_MS_READY;
4547
cl_raw_list_unlock(connection->send_message_list);
4548
return CL_RETVAL_SEND_ERROR;
4552
if (message->message_state == CL_MS_READY) {
4554
CL_LOG_STR_STR_INT(CL_LOG_INFO, "sent message to: ", connection->remote->comp_host,
4555
connection->remote->comp_name,
4556
(int)connection->remote->comp_id);
4558
connection->statistic->bytes_sent = connection->statistic->bytes_sent + message->message_length;
4559
connection->statistic->real_bytes_sent = connection->statistic->real_bytes_sent + message->message_length ;
4561
if (connection->connection_state == CL_CONNECTED) {
4562
int connect_port = 0;
4563
/* Touch endpoint, he is still active */
4564
if (cl_com_connection_get_connect_port(connection, &connect_port) == CL_RETVAL_OK) {
4565
cl_endpoint_list_define_endpoint(cl_com_get_endpoint_list(),
4568
connection->auto_close_type, CL_FALSE );
4571
switch (message->message_df) {
4573
CL_LOG(CL_LOG_INFO,"sent connection close message");
4574
connection->connection_sub_state = CL_COM_WAIT_FOR_CCRM;
4575
connection->ccm_sent = 1;
4578
case CL_MIH_DF_CCRM:
4579
CL_LOG(CL_LOG_INFO,"sent connection close response message");
4580
connection->connection_sub_state = CL_COM_DONE; /* CLOSE message */
4581
connection->data_write_flag = CL_COM_DATA_NOT_READY;
4582
connection->ccrm_sent = 1;
4584
/* This client has been gone forever, remove his port from endpoint list */
4585
cl_endpoint_list_undefine_endpoint(cl_com_get_endpoint_list(), connection->remote);
4589
CL_LOG(CL_LOG_INFO,"sent binary message");
4592
CL_LOG(CL_LOG_INFO,"sent XML message");
4595
CL_LOG(CL_LOG_INFO,"sent acknowledge message");
4598
CL_LOG(CL_LOG_INFO,"sent status information message");
4600
case CL_MIH_DF_SIRM:
4601
CL_LOG(CL_LOG_INFO,"sent status information response message");
4605
CL_LOG(CL_LOG_ERROR,"unexpected data format");
4608
connection->write_buffer_timeout_time = 0;
4609
gettimeofday(&message->message_send_time,NULL);
4610
cl_com_add_debug_message(connection, NULL, message);
4612
/* set last transfer time of connection */
4613
memcpy(&connection->last_transfer_time,&message->message_send_time,sizeof(struct timeval) );
4614
switch(message->message_df) {
4616
/* don't remove message */
4617
message->message_state = CL_MS_PROTOCOL; /* wait for SIRM, do not delete message */
4620
/* remove messages which don't want an acknoledge */
4622
if (message->message_mat == CL_MIH_MAT_NAK) {
4623
if (cl_message_list_remove_send(connection, message, 0) == CL_RETVAL_OK) {
4624
cl_com_free_message(&message);
4625
#ifdef CL_DO_COMMLIB_DEBUG
4626
CL_LOG(CL_LOG_DEBUG,"last sent message removed from send_message_list");
4630
message->message_state = CL_MS_PROTOCOL; /* wait for ACK, do not delete message */
4634
/* try to find out if there are more messages to send */
4636
message_list_elem = next_message_list_elem;
4637
while (message_list_elem != NULL) {
4638
if (message_list_elem->message->message_state == CL_MS_PROTOCOL ||
4639
message_list_elem->message->message_state == CL_MS_READY) {
4640
message_list_elem = cl_message_list_get_next_elem(message_list_elem);
4643
message = message_list_elem->message;
4648
if (message == NULL) {
4649
/* no messages to send, data is not ready */
4650
connection->data_write_flag = CL_COM_DATA_NOT_READY;
4653
cl_raw_list_unlock(connection->send_message_list);
4655
if (connection->ccm_received == 1) {
4656
CL_LOG(CL_LOG_INFO,"received ccm");
4658
CL_LOG_INT(CL_LOG_WARNING,"receive buffer:",(int)cl_raw_list_get_elem_count(connection->received_message_list) );
4659
CL_LOG_INT(CL_LOG_WARNING,"send buffer :",(int)cl_raw_list_get_elem_count(connection->send_message_list) );
4660
CL_LOG_INT(CL_LOG_WARNING,"ccm_received :",(int)connection->ccm_received);
4662
if( cl_raw_list_get_elem_count(connection->send_message_list) == 0 &&
4663
cl_raw_list_get_elem_count(connection->received_message_list) == 0 ) {
4664
connection->ccm_received = 2;
4665
connection->connection_sub_state = CL_COM_SENDING_CCRM;
4666
/* disable #if 1 if you want to test a client, that is frozen! */
4667
cl_commlib_send_ccrm_message(connection);
4668
CL_LOG(CL_LOG_WARNING,"sending ccrm");
4670
CL_LOG(CL_LOG_WARNING,"can't send ccrm, still messages in buffer");
4674
return return_value;
4678
/* receive message from host, component, component id from handle */
4679
#ifdef __CL_FUNCTION__
4680
#undef __CL_FUNCTION__
4682
#define __CL_FUNCTION__ "cl_commlib_receive_message()"
4683
int cl_commlib_receive_message(cl_com_handle_t* handle,
4684
char* un_resolved_hostname,
4685
char* component_name,
4686
unsigned long component_id,
4688
unsigned long response_mid,
4689
cl_com_message_t** message,
4690
cl_com_endpoint_t** sender ) {
4691
cl_com_connection_t* connection = NULL;
4692
cl_message_list_elem_t* message_elem = NULL;
4695
long my_timeout = 0;
4696
int message_sent = 0;
4698
int leave_reason = CL_RETVAL_OK;
4699
int message_match = 1;
4702
cl_commlib_check_callback_functions();
4704
if (handle == NULL || message == NULL || *message != NULL) {
4705
return CL_RETVAL_PARAMS;
4708
if (synchron == CL_TRUE) {
4709
gettimeofday(&now,NULL);
4710
my_timeout = now.tv_sec + handle->synchron_receive_timeout;
4713
if (un_resolved_hostname != NULL || component_name != NULL || component_id != 0) {
4714
CL_LOG(CL_LOG_DEBUG,"message filtering not supported");
4717
leave_reason = CL_RETVAL_OK;
4718
/* return if there are no connections in list */
4720
/* If messages_ready_for_read is > 0 there are messages with state CL_MS_READY in receive
4721
* message lists for application
4723
pthread_mutex_lock(handle->messages_ready_mutex);
4724
if (handle->messages_ready_for_read != 0) {
4725
cl_app_message_queue_elem_t* app_mq_elem = NULL;
4727
/* when accessing a connection from the received_message_queue it is
4728
* important to hold the messages_ready_mutex lock, because
4729
* cl_connection_list_destroy_connections_to_close() will also lock
4730
* the mutex to remove all queue entries referencing to a connection
4731
* which will be removed. So as long the messages_ready_mutex lock
4732
* is owned the connection is guilty!
4736
cl_raw_list_lock(handle->received_message_queue);
4737
app_mq_elem = cl_app_message_queue_get_first_elem(handle->received_message_queue);
4738
while (app_mq_elem != NULL) {
4739
connection = app_mq_elem->rcv_connection;
4741
/* TODO: filter messages for endpoint, if specified and open connection if necessary (use cl_commlib_open_connection()) */
4743
/* try to find complete received message in received message list of this connection */
4744
cl_raw_list_lock(connection->received_message_list);
4745
message_elem = cl_message_list_get_first_elem(connection->received_message_list);
4746
while(message_elem) {
4747
if (message_elem->message->message_state == CL_MS_READY) {
4748
message_match = 1; /* always match the message */
4750
/* try to find response for mid */
4751
/* TODO: Just return a matchin response_mid !!! 0 = match all else match response_id */
4752
if (response_mid != 0) {
4753
if(message_elem->message->message_response_id != response_mid) {
4754
/* if the response_mid parameter is set we can't return this message because
4755
the response id is not the same. -> We have to wait for the correct message */
4756
if ( response_mid > connection->last_send_message_id || connection->last_send_message_id == 0 ) {
4757
CL_LOG(CL_LOG_WARNING, "protocol error: can't wait for unsent message!!!");
4758
cl_raw_list_unlock(connection->received_message_list);
4759
cl_raw_list_unlock(handle->received_message_queue);
4760
pthread_mutex_unlock(handle->messages_ready_mutex);
4761
return CL_RETVAL_PROTOCOL_ERROR;
4764
if ( response_mid > message_elem->message->message_response_id ) {
4765
CL_LOG(CL_LOG_INFO, "protocol error: There is still a lower message id than requested");
4770
CL_LOG_INT(CL_LOG_INFO,"received response for message id", (int)response_mid);
4773
/* never return a message with response id when response_mid is 0 */
4774
if (message_elem->message->message_response_id != 0) {
4775
if (handle->do_shutdown != 2) {
4776
CL_LOG_INT(CL_LOG_INFO,"message response id is set for this message:", (int)message_elem->message->message_response_id);
4779
/* this is handle shutdown mode without returning any message to application */
4780
/* cl_commlib_shutdown_handle() has to delete this message */
4781
CL_LOG_INT(CL_LOG_WARNING,"returning response message without request:", (int)message_elem->message->message_response_id);
4786
if (message_match == 1) {
4787
/* remove message from received message list*/
4788
*message = message_elem->message;
4789
CL_LOG(CL_LOG_INFO, "fetched message from received message queue");
4790
cl_message_list_remove_receive(connection, *message, 0);
4792
/* release the message list */
4793
cl_raw_list_unlock(connection->received_message_list);
4795
if (sender != NULL) {
4796
*sender = cl_com_dup_endpoint(connection->remote);
4800
#ifndef CL_DO_SEND_ACK_AT_COMMLIB_LAYER
4801
/* send a message acknowledge when application gets the message */
4803
/* send acknowledge for CL_MIH_MAT_ACK type (application removed message from buffer) */
4804
if ( (*message)->message_mat == CL_MIH_MAT_ACK) {
4805
cl_commlib_send_ack_message(connection, *message );
4808
#endif /* CL_DO_SEND_ACK_AT_COMMLIB_LAYER */
4810
if (connection->ccm_received == 1) {
4811
CL_LOG(CL_LOG_INFO,"received ccm");
4813
CL_LOG_INT(CL_LOG_WARNING,"receive buffer:",(int)cl_raw_list_get_elem_count(connection->received_message_list) );
4814
CL_LOG_INT(CL_LOG_WARNING,"send buffer :",(int)cl_raw_list_get_elem_count(connection->send_message_list) );
4815
CL_LOG_INT(CL_LOG_WARNING,"ccm_received :",(int)connection->ccm_received);
4817
if( cl_raw_list_get_elem_count(connection->send_message_list) == 0 &&
4818
cl_raw_list_get_elem_count(connection->received_message_list) == 0 ) {
4819
connection->ccm_received = 2;
4820
connection->connection_sub_state = CL_COM_SENDING_CCRM;
4821
/* disable #if 1 if you want to test a client, that is frozen! */
4822
cl_commlib_send_ccrm_message(connection);
4824
CL_LOG(CL_LOG_WARNING,"sending ccrm");
4826
CL_LOG(CL_LOG_WARNING,"can't send ccrm, still messages in buffer");
4830
handle->last_receive_message_connection = connection;
4832
/* decrease counter for ready messages */
4833
handle->messages_ready_for_read = handle->messages_ready_for_read - 1;
4834
cl_app_message_queue_remove(handle->received_message_queue, connection, 0, CL_FALSE);
4835
cl_raw_list_unlock(handle->received_message_queue);
4836
pthread_mutex_unlock(handle->messages_ready_mutex);
4838
if ( message_sent ) {
4839
switch(cl_com_create_threads) {
4841
CL_LOG(CL_LOG_INFO,"no threads enabled");
4842
/* we just want to trigger write , no wait for read*/
4843
cl_commlib_trigger(handle, 1);
4846
/* we just want to trigger write , no wait for read*/
4847
CL_LOG(CL_LOG_INFO,"trigger write thread");
4848
cl_thread_trigger_event(handle->write_thread);
4852
return CL_RETVAL_OK;
4855
/* get next message */
4856
message_elem = cl_message_list_get_next_elem(message_elem);
4858
/* unlock message list */
4859
cl_raw_list_unlock(connection->received_message_list);
4861
app_mq_elem = cl_app_message_queue_get_next_elem(app_mq_elem);
4863
cl_raw_list_unlock(handle->received_message_queue);
4864
pthread_mutex_unlock(handle->messages_ready_mutex);
4865
CL_LOG(CL_LOG_INFO,"got no message, but thought there should be one");
4867
/* cl_commlib_received is used together with cl_commlib_trigger.
4868
If the received message list is not empty the trigger does not
4869
sleep and if the desired message is not in the list this
4870
behaviour would cause the application to use 100% CPU. To
4871
prevent this we do the core part of the trigger and wait
4872
for the timeout or till a new message was received */
4873
if (cl_com_create_threads == CL_RW_THREAD) {
4874
cl_thread_wait_for_thread_condition(handle->app_condition ,
4875
handle->select_sec_timeout,
4876
handle->select_usec_timeout);
4880
pthread_mutex_unlock(handle->messages_ready_mutex);
4882
/* return if there are no connections in list */
4883
/* check if it is possible to receive messages */
4884
if (handle->service_provider == CL_FALSE) {
4885
cl_raw_list_lock(handle->send_message_queue);
4886
if (cl_connection_list_get_first_elem(handle->send_message_queue) == NULL) {
4887
cl_raw_list_lock(handle->connection_list);
4888
if (cl_connection_list_get_first_elem(handle->connection_list) == NULL) {
4889
leave_reason = CL_RETVAL_CONNECTION_NOT_FOUND;
4891
cl_raw_list_unlock(handle->connection_list);
4893
cl_raw_list_unlock(handle->send_message_queue);
4897
if (synchron == CL_TRUE) {
4898
switch(cl_com_create_threads) {
4900
cl_commlib_trigger(handle, 1);
4903
cl_thread_trigger_event(handle->read_thread);
4904
return_value = cl_thread_wait_for_thread_condition(handle->app_condition,
4905
handle->select_sec_timeout,
4906
handle->select_usec_timeout);
4907
if (return_value == CL_RETVAL_CONDITION_WAIT_TIMEOUT) {
4908
CL_LOG(CL_LOG_INFO,"APPLICATION GOT CONDITION WAIT TIMEOUT");
4912
/* at this point the handle->connection_list must be unlocked */
4913
if (leave_reason == CL_RETVAL_CONNECTION_NOT_FOUND) {
4915
* we are no service provider AND we have no connection !
4916
* we can't wait for a (possible) new connection, so we return immediately
4918
return leave_reason;
4920
gettimeofday(&now,NULL);
4921
if (now.tv_sec > my_timeout) {
4922
return CL_RETVAL_SYNC_RECEIVE_TIMEOUT;
4925
} while (synchron == CL_TRUE && cl_com_get_ignore_timeouts_flag() == CL_FALSE);
4928
* when leave_reason is CL_RETVAL_CONNECTION_NOT_FOUND the connection list
4929
* is empty return this as indication for an error otherwise return
4930
* CL_RETVAL_NO_MESSAGE to indicate that there is no message available
4933
if (leave_reason == CL_RETVAL_OK) {
4934
return CL_RETVAL_NO_MESSAGE;
4936
return leave_reason;
4942
* o search for endpoint with matching hostname, component name or component id
4943
* o un_resolved_hostname, component_name and component_id can be NULL or 0
4944
* o caller must free returned endpoint list with cl_endpoint_list_cleanup()
4948
#ifdef __CL_FUNCTION__
4949
#undef __CL_FUNCTION__
4951
#define __CL_FUNCTION__ "cl_commlib_search_endpoint()"
4952
int cl_commlib_search_endpoint(cl_com_handle_t* handle,
4953
char* un_resolved_hostname, char* component_name, unsigned long component_id,
4954
cl_bool_t only_connected,
4955
cl_raw_list_t** endpoint_list) {
4957
cl_com_connection_t* connection = NULL;
4958
cl_connection_list_elem_t* elem = NULL;
4959
char* resolved_hostname = NULL;
4960
int retval = CL_RETVAL_OK;
4963
if (handle == NULL || endpoint_list == NULL || *endpoint_list != NULL) {
4964
return CL_RETVAL_PARAMS;
4967
if (un_resolved_hostname != NULL) {
4968
retval = cl_com_cached_gethostbyname(un_resolved_hostname, &resolved_hostname, NULL, NULL, NULL );
4969
if (retval != CL_RETVAL_OK) {
4970
CL_LOG_STR(CL_LOG_ERROR,"could not resolve host",un_resolved_hostname);
4975
retval = cl_endpoint_list_setup(endpoint_list, "matching endpoints", 0, 0, CL_TRUE);
4976
if (retval != CL_RETVAL_OK) {
4977
free(resolved_hostname);
4978
resolved_hostname = NULL;
4979
cl_endpoint_list_cleanup(endpoint_list);
4983
cl_raw_list_lock(handle->connection_list);
4984
elem = cl_connection_list_get_first_elem(handle->connection_list);
4986
connection = elem->connection;
4987
elem = cl_connection_list_get_next_elem(elem);
4989
if (connection->remote != NULL) {
4990
if (component_id > 0) {
4991
if (connection->remote->comp_id == component_id) {
4992
cl_endpoint_list_define_endpoint(*endpoint_list, connection->remote, 0, connection->auto_close_type, CL_FALSE );
4996
if (component_name != NULL && connection->remote->comp_name != NULL ) {
4997
if (strcasecmp(connection->remote->comp_name, component_name) == 0) {
4998
cl_endpoint_list_define_endpoint(*endpoint_list, connection->remote, 0, connection->auto_close_type, CL_FALSE );
5002
if (resolved_hostname != NULL) {
5003
if (cl_com_compare_hosts(resolved_hostname, connection->remote->comp_host ) == CL_RETVAL_OK) {
5004
cl_endpoint_list_define_endpoint(*endpoint_list, connection->remote, 0, connection->auto_close_type, CL_FALSE );
5010
cl_raw_list_unlock(handle->connection_list);
5012
if ( only_connected == CL_FALSE ) {
5013
/* also search in known endpoint list for matching connections */
5014
cl_raw_list_t* global_endpoint_list = cl_com_get_endpoint_list();
5016
if ( global_endpoint_list != NULL ) {
5017
cl_endpoint_list_elem_t* endpoint_elem = NULL;
5018
cl_endpoint_list_elem_t* act_endpoint_elem = NULL;
5020
cl_raw_list_lock(global_endpoint_list);
5021
endpoint_elem = cl_endpoint_list_get_first_elem(global_endpoint_list);
5022
while(endpoint_elem) {
5023
act_endpoint_elem = endpoint_elem;
5024
endpoint_elem = cl_endpoint_list_get_next_elem(endpoint_elem);
5026
if (act_endpoint_elem->endpoint) {
5027
if ( component_id > 0 ) {
5028
if ( act_endpoint_elem->endpoint->comp_id == component_id ) {
5029
cl_endpoint_list_define_endpoint(*endpoint_list,
5030
act_endpoint_elem->endpoint,
5031
act_endpoint_elem->service_port,
5032
act_endpoint_elem->autoclose,
5033
act_endpoint_elem->is_static );
5037
if ( component_name != NULL && act_endpoint_elem->endpoint->comp_name != NULL ) {
5038
if ( strcmp(act_endpoint_elem->endpoint->comp_name, component_name) == 0 ) {
5039
cl_endpoint_list_define_endpoint(*endpoint_list,
5040
act_endpoint_elem->endpoint,
5041
act_endpoint_elem->service_port,
5042
act_endpoint_elem->autoclose,
5043
act_endpoint_elem->is_static );
5047
if ( resolved_hostname != NULL) {
5048
if ( cl_com_compare_hosts( resolved_hostname, act_endpoint_elem->endpoint->comp_host ) == CL_RETVAL_OK ) {
5049
cl_endpoint_list_define_endpoint(*endpoint_list,
5050
act_endpoint_elem->endpoint,
5051
act_endpoint_elem->service_port,
5052
act_endpoint_elem->autoclose,
5053
act_endpoint_elem->is_static );
5060
cl_raw_list_unlock(global_endpoint_list);
5063
free(resolved_hostname);
5064
resolved_hostname = NULL;
5065
return CL_RETVAL_OK;
5068
#ifdef __CL_FUNCTION__
5069
#undef __CL_FUNCTION__
5071
#define __CL_FUNCTION__ "cl_commlib_send_ack_message()"
5072
/* connection_list is locked inside of this call */
5073
static int cl_commlib_send_ack_message(cl_com_connection_t* connection, cl_com_message_t* message) {
5074
cl_byte_t* ack_message_data = NULL;
5075
unsigned long ack_message_size = 0;
5076
int ret_val = CL_RETVAL_OK;
5077
cl_com_message_t* ack_message = NULL;
5079
if (connection == NULL || message == NULL) {
5080
return CL_RETVAL_PARAMS;
5083
ack_message_size = CL_AM_MESSAGE_SIZE;
5084
ack_message_size = ack_message_size + cl_util_get_ulong_number_length(message->message_id);
5086
ack_message_data = (cl_byte_t*)malloc(sizeof(cl_byte_t)* ( ack_message_size + 1) ) ;
5087
if (ack_message_data == NULL) {
5088
return CL_RETVAL_MALLOC;
5090
sprintf((char*)ack_message_data,CL_AM_MESSAGE, CL_AM_MESSAGE_VERSION, message->message_id);
5092
ret_val = cl_com_setup_message(&ack_message, connection, ack_message_data, ack_message_size, CL_MIH_MAT_NAK, 0, 0);
5093
if (ret_val != CL_RETVAL_OK) {
5096
ack_message->message_df = CL_MIH_DF_AM;
5098
CL_LOG_INT(CL_LOG_INFO,"sending ack for message=", (int)message->message_id);
5100
ret_val = cl_message_list_append_send(connection, ack_message, 1);
5105
/* connection_list is locked inside of this call */
5106
#ifdef __CL_FUNCTION__
5107
#undef __CL_FUNCTION__
5109
#define __CL_FUNCTION__ "cl_commlib_send_ccm_message()"
5110
static int cl_commlib_send_ccm_message(cl_com_connection_t* connection) {
5111
cl_byte_t* ccm_message_data = NULL;
5112
unsigned long ccm_message_size = 0;
5113
int ret_val = CL_RETVAL_OK;
5114
cl_com_message_t* ccm_message = NULL;
5116
if (connection == NULL ) {
5117
return CL_RETVAL_PARAMS;
5120
ccm_message_size = CL_CCM_MESSAGE_SIZE;
5122
ccm_message_data = (cl_byte_t*)malloc(sizeof(cl_byte_t)* ( ccm_message_size + 1) ) ;
5123
if (ccm_message_data == NULL) {
5124
return CL_RETVAL_MALLOC;
5126
sprintf((char*)ccm_message_data, CL_CCM_MESSAGE, CL_CCM_MESSAGE_VERSION);
5128
ret_val = cl_com_setup_message(&ccm_message, connection, ccm_message_data , ccm_message_size , CL_MIH_MAT_NAK , 0 ,0);
5129
if (ret_val != CL_RETVAL_OK) {
5132
ccm_message->message_df = CL_MIH_DF_CCM;
5133
CL_LOG(CL_LOG_INFO,"sending connection close message");
5134
ret_val = cl_message_list_append_send(connection, ccm_message, 1);
5138
/* connection_list is locked inside of this call */
5139
static int cl_commlib_send_sim_message(cl_com_connection_t* connection, unsigned long* mid) {
5140
cl_byte_t* sim_message_data = NULL;
5141
unsigned long sim_message_size = 0;
5142
int ret_val = CL_RETVAL_OK;
5143
cl_com_message_t* sim_message = NULL;
5145
if (connection == NULL ) {
5146
return CL_RETVAL_PARAMS;
5149
sim_message_size = CL_SIM_MESSAGE_SIZE;
5151
sim_message_data = (cl_byte_t*)malloc(sizeof(cl_byte_t)* ( sim_message_size + 1) ) ;
5152
if (sim_message_data == NULL) {
5153
return CL_RETVAL_MALLOC;
5155
sprintf((char*)sim_message_data, CL_SIM_MESSAGE, CL_SIM_MESSAGE_VERSION);
5157
ret_val = cl_com_setup_message(&sim_message, connection, sim_message_data , sim_message_size , CL_MIH_MAT_NAK , 0 ,0);
5158
if (ret_val != CL_RETVAL_OK) {
5161
sim_message->message_df = CL_MIH_DF_SIM;
5163
*mid = sim_message->message_id;
5165
CL_LOG(CL_LOG_INFO,"sending information message (SIM)");
5166
ret_val = cl_message_list_append_send(connection, sim_message, 1);
5170
/* connection_list is locked inside of this call */
5171
static int cl_commlib_send_sirm_message(cl_com_connection_t* connection,
5172
cl_com_message_t* message,
5173
unsigned long starttime,
5174
unsigned long runtime,
5175
unsigned long buffered_read_messages,
5176
unsigned long buffered_write_messages,
5177
unsigned long connection_count,
5178
unsigned long application_status,
5180
cl_byte_t* sirm_message_data = NULL;
5181
char* xml_infotext = NULL;
5182
unsigned long sirm_message_size = 0;
5183
int ret_val = CL_RETVAL_OK;
5184
cl_com_message_t* sirm_message = NULL;
5186
if (connection == NULL|| message == NULL || infotext == NULL ) {
5187
return CL_RETVAL_PARAMS;
5190
ret_val = cl_com_transformString2XML(infotext, &xml_infotext);
5191
if (ret_val != CL_RETVAL_OK) {
5195
sirm_message_size = CL_SIRM_MESSAGE_SIZE; /* already contains sizeof version */
5196
sirm_message_size += cl_util_get_ulong_number_length(message->message_id);
5197
sirm_message_size += cl_util_get_ulong_number_length(starttime);
5198
sirm_message_size += cl_util_get_ulong_number_length(runtime);
5199
sirm_message_size += cl_util_get_ulong_number_length(buffered_read_messages);
5200
sirm_message_size += cl_util_get_ulong_number_length(buffered_write_messages);
5201
sirm_message_size += cl_util_get_ulong_number_length(connection_count);
5202
sirm_message_size += cl_util_get_ulong_number_length(application_status);
5203
sirm_message_size += strlen(xml_infotext);
5205
sirm_message_data = (cl_byte_t*)malloc(sizeof(cl_byte_t)* (sirm_message_size + 1));
5206
if (sirm_message_data == NULL) {
5207
if (xml_infotext != NULL) {
5209
xml_infotext = NULL;
5211
return CL_RETVAL_MALLOC;
5213
sprintf((char*)sirm_message_data, CL_SIRM_MESSAGE,
5214
CL_SIRM_MESSAGE_VERSION,
5215
message->message_id,
5218
buffered_read_messages,
5219
buffered_write_messages,
5224
if (xml_infotext != NULL) {
5226
xml_infotext = NULL;
5229
ret_val = cl_com_setup_message(&sirm_message, connection, sirm_message_data , sirm_message_size , CL_MIH_MAT_NAK , 0 ,0);
5230
if (ret_val != CL_RETVAL_OK) {
5233
sirm_message->message_df = CL_MIH_DF_SIRM;
5234
CL_LOG_INT(CL_LOG_INFO,"sending SIRM for message=", (int)message->message_id);
5236
ret_val = cl_message_list_append_send(connection, sirm_message, 1);
5241
/* connection_list is locked inside of this call */
5242
#ifdef __CL_FUNCTION__
5243
#undef __CL_FUNCTION__
5245
#define __CL_FUNCTION__ "cl_commlib_send_ccrm_message()"
5246
static int cl_commlib_send_ccrm_message(cl_com_connection_t* connection) {
5247
cl_byte_t* ccrm_message_data = NULL;
5248
unsigned long ccrm_message_size = 0;
5249
int ret_val = CL_RETVAL_OK;
5250
cl_com_message_t* ccrm_message = NULL;
5252
if (connection == NULL ) {
5253
return CL_RETVAL_PARAMS;
5256
ccrm_message_size = CL_CCRM_MESSAGE_SIZE;
5258
ccrm_message_data = (cl_byte_t*)malloc(sizeof(cl_byte_t)* (ccrm_message_size + 1)) ;
5259
if (ccrm_message_data == NULL) {
5260
return CL_RETVAL_MALLOC;
5262
sprintf((char*)ccrm_message_data, CL_CCRM_MESSAGE, CL_CCRM_MESSAGE_VERSION);
5264
ret_val = cl_com_setup_message(&ccrm_message, connection, ccrm_message_data , ccrm_message_size , CL_MIH_MAT_NAK , 0 ,0);
5265
if (ret_val != CL_RETVAL_OK) {
5268
ccrm_message->message_df = CL_MIH_DF_CCRM;
5269
CL_LOG(CL_LOG_INFO,"sending connection close response message");
5270
ret_val = cl_message_list_append_send(connection, ccrm_message, 1);
5278
CL_MIH_MAT_NAK = send message and don't expect any acknowledge
5279
CL_MIH_MAT_ACK = send message and block till communication partner application has read the message
5280
CL_MIH_MAT_SYNC = send message and block till communication partner application has processed all message actions (TODO)
5283
#ifdef __CL_FUNCTION__
5284
#undef __CL_FUNCTION__
5286
#define __CL_FUNCTION__ "cl_commlib_check_for_ack()"
5287
int cl_commlib_check_for_ack(cl_com_handle_t* handle, char* un_resolved_hostname, char* component_name, unsigned long component_id, unsigned long mid , cl_bool_t do_block) {
5288
int found_message = 0;
5289
int message_added = 0;
5290
cl_connection_list_elem_t* elem = NULL;
5291
cl_com_connection_t* connection = NULL;
5292
cl_com_endpoint_t receiver;
5293
cl_message_list_elem_t* message_list_elem = NULL;
5294
cl_message_list_elem_t* next_message_list_elem = NULL;
5295
cl_com_message_t* message = NULL;
5296
int return_value = CL_RETVAL_OK;
5297
char* unique_hostname = NULL;
5298
struct in_addr in_addr;
5301
cl_commlib_check_callback_functions();
5303
if ( handle == NULL) {
5304
return CL_RETVAL_HANDLE_NOT_FOUND;
5307
/* check endpoint parameters: un_resolved_hostname, componenet_name and componenet_id */
5308
if ( un_resolved_hostname == NULL || component_name == NULL || component_id == 0 ) {
5309
return CL_RETVAL_UNKNOWN_ENDPOINT;
5312
/* resolve hostname */
5313
return_value = cl_com_cached_gethostbyname(un_resolved_hostname, &unique_hostname, &in_addr, NULL, NULL);
5314
if (return_value != CL_RETVAL_OK) {
5315
return return_value;
5318
/* setup endpoint */
5319
receiver.comp_host = unique_hostname;
5320
receiver.comp_name = component_name;
5321
receiver.comp_id = component_id;
5322
receiver.addr.s_addr = in_addr.s_addr;
5323
receiver.hash_id = cl_create_endpoint_string(&receiver);
5324
if (receiver.hash_id == NULL) {
5325
free(unique_hostname);
5326
return CL_RETVAL_MALLOC;
5329
while(do_stop == 0) {
5331
/* lock handle connection list */
5332
cl_raw_list_lock(handle->connection_list);
5335
elem = cl_connection_list_get_elem_endpoint(handle->connection_list, &receiver);
5337
connection = elem->connection;
5339
/* check for message acknowledge */
5340
cl_raw_list_lock(connection->send_message_list);
5342
message_list_elem = cl_message_list_get_first_elem(connection->send_message_list);
5343
while(message_list_elem != NULL && found_message == 0) {
5344
message = message_list_elem->message;
5345
next_message_list_elem = cl_message_list_get_next_elem(message_list_elem);
5346
if (message->message_id == mid) {
5349
if (message->message_ack_flag == 1) {
5350
cl_message_list_remove_send(connection, message, 0);
5351
cl_com_free_message(&message);
5352
cl_raw_list_unlock(connection->send_message_list);
5354
if (connection->ccm_received == 1) {
5355
CL_LOG(CL_LOG_INFO,"received ccm");
5357
CL_LOG_INT(CL_LOG_WARNING,"receive buffer:",(int)cl_raw_list_get_elem_count(connection->received_message_list) );
5358
CL_LOG_INT(CL_LOG_WARNING,"send buffer :",(int)cl_raw_list_get_elem_count(connection->send_message_list) );
5359
CL_LOG_INT(CL_LOG_WARNING,"ccm_received :",(int)connection->ccm_received);
5361
if( cl_raw_list_get_elem_count(connection->send_message_list) == 0 &&
5362
cl_raw_list_get_elem_count(connection->received_message_list) == 0 ) {
5363
connection->ccm_received = 2;
5364
connection->connection_sub_state = CL_COM_SENDING_CCRM;
5365
/* disable #if 1 if you want to test a client, that is frozen! */
5366
cl_commlib_send_ccrm_message(connection);
5368
CL_LOG(CL_LOG_WARNING,"sending ccrm");
5370
CL_LOG(CL_LOG_WARNING,"can't send ccrm, still messages in buffer");
5374
cl_raw_list_unlock(handle->connection_list);
5375
free(unique_hostname);
5376
free(receiver.hash_id);
5377
CL_LOG_INT(CL_LOG_INFO,"got message acknowledge:",(int)mid);
5378
if (message_added) {
5379
switch(cl_com_create_threads) {
5381
CL_LOG(CL_LOG_INFO,"no threads enabled");
5382
/* we just want to trigger write , no wait for read*/
5383
cl_commlib_trigger(handle, 1);
5386
/* we just want to trigger write , no wait for read*/
5387
cl_thread_trigger_event(handle->write_thread);
5392
return CL_RETVAL_OK;
5394
CL_LOG_INT(CL_LOG_INFO,"message is not acknowledged:", (int)mid);
5397
message_list_elem = next_message_list_elem;
5399
cl_raw_list_unlock(connection->send_message_list);
5401
CL_LOG_STR(CL_LOG_ERROR,"can't find connection to:", receiver.comp_host);
5402
cl_raw_list_unlock(handle->connection_list);
5403
free(unique_hostname);
5404
free(receiver.hash_id);
5405
return CL_RETVAL_CONNECTION_NOT_FOUND;
5407
cl_raw_list_unlock(handle->connection_list);
5409
if (found_message == 0) {
5410
CL_LOG_INT(CL_LOG_ERROR,"message not found or removed because of ack timeout", (int)mid);
5411
free(unique_hostname);
5412
free(receiver.hash_id);
5413
return CL_RETVAL_MESSAGE_ACK_ERROR; /* message not found or removed because of ack timeout */
5416
if (do_block == CL_TRUE) {
5417
switch(cl_com_create_threads) {
5419
CL_LOG(CL_LOG_INFO,"no threads enabled");
5420
cl_commlib_trigger(handle, 1);
5423
cl_thread_wait_for_thread_condition(handle->read_condition,
5424
handle->select_sec_timeout,
5425
handle->select_usec_timeout);
5429
free(unique_hostname);
5430
free(receiver.hash_id);
5431
return CL_RETVAL_MESSAGE_WAIT_FOR_ACK;
5434
return CL_RETVAL_UNKNOWN;
5438
#ifdef __CL_FUNCTION__
5439
#undef __CL_FUNCTION__
5441
#define __CL_FUNCTION__ "cl_commlib_open_connection()"
5442
int cl_commlib_open_connection(cl_com_handle_t* handle, char* un_resolved_hostname, char* component_name, unsigned long component_id) {
5445
char* unique_hostname = NULL;
5446
struct in_addr in_addr;
5447
cl_bool_t messages_for_app = CL_FALSE;
5448
cl_com_endpoint_t receiver;
5449
cl_connection_list_elem_t* elem = NULL;
5450
cl_com_connection_t* connection = NULL;
5451
cl_com_connection_t* new_con = NULL;
5452
cl_com_endpoint_t* remote_endpoint = NULL;
5453
cl_com_endpoint_t* local_endpoint = NULL;
5455
int shutdown_received = 0;
5457
cl_commlib_check_callback_functions();
5459
/* check endpoint parameters: un_resolved_hostname , componenet_name and componenet_id */
5460
if (un_resolved_hostname == NULL || component_name == NULL || component_id == 0 ) {
5461
CL_LOG(CL_LOG_ERROR,cl_get_error_text(CL_RETVAL_UNKNOWN_ENDPOINT));
5462
return CL_RETVAL_UNKNOWN_ENDPOINT;
5465
CL_LOG_STR(CL_LOG_INFO,"open host :",un_resolved_hostname );
5466
CL_LOG_STR(CL_LOG_INFO,"open component_name :",component_name );
5467
CL_LOG_INT(CL_LOG_INFO,"open component_id :",(int)component_id );
5470
if (handle == NULL) {
5471
CL_LOG(CL_LOG_ERROR,cl_get_error_text(CL_RETVAL_HANDLE_NOT_FOUND));
5472
return CL_RETVAL_HANDLE_NOT_FOUND;
5475
/* resolve hostname */
5476
ret_val = cl_com_cached_gethostbyname(un_resolved_hostname, &unique_hostname,
5477
&in_addr, NULL, NULL);
5478
if (ret_val != CL_RETVAL_OK) {
5479
CL_LOG(CL_LOG_ERROR,cl_get_error_text(ret_val));
5483
/* setup endpoint */
5484
receiver.comp_host = unique_hostname;
5485
receiver.comp_name = component_name;
5486
receiver.comp_id = component_id;
5487
receiver.addr.s_addr = in_addr.s_addr;
5488
receiver.hash_id = cl_create_endpoint_string(&receiver);
5489
if (receiver.hash_id == NULL) {
5490
free(unique_hostname);
5491
return CL_RETVAL_MALLOC;
5494
/* lock handle mutex, because we don't want to allow more threads to create a new
5495
connection for the same receiver endpoint */
5496
pthread_mutex_lock(handle->connection_list_mutex);
5499
/* lock handle connection list */
5500
cl_raw_list_lock(handle->connection_list);
5503
elem = cl_connection_list_get_elem_endpoint(handle->connection_list, &receiver);
5506
connection = elem->connection;
5508
if (connection->ccm_received != 0) {
5510
/* we have sent connection close response, do not accept any new message */
5511
/* we wait till connection is down and try to reconnect */
5512
CL_LOG(CL_LOG_ERROR,"connection is open, but going down now");
5513
gettimeofday(&now,NULL);
5514
connection->shutdown_timeout = now.tv_sec + handle->acknowledge_timeout + handle->close_connection_timeout;
5515
shutdown_received = 1;
5516
} else if (connection->connection_state == CL_CONNECTED &&
5517
connection->connection_sub_state != CL_COM_WORK) {
5518
CL_LOG(CL_LOG_ERROR,"connection is open, but going down now");
5519
gettimeofday(&now,NULL);
5520
connection->shutdown_timeout = now.tv_sec + handle->acknowledge_timeout + handle->close_connection_timeout;
5521
shutdown_received = 2;
5523
/* connection is open, just return */
5524
cl_raw_list_unlock(handle->connection_list);
5525
CL_LOG(CL_LOG_WARNING,"connection is already open");
5526
free(unique_hostname);
5527
free(receiver.hash_id);
5528
unique_hostname = NULL;
5529
receiver.comp_host = NULL;
5531
/* unlock connection list */
5532
pthread_mutex_unlock(handle->connection_list_mutex);
5533
return CL_RETVAL_OK;
5536
cl_raw_list_unlock(handle->connection_list);
5539
if (shutdown_received != 0) {
5540
/* the connection is going down -> wait for removal of connection*/
5541
int still_in_list = 1;
5542
/* TODO: save unread messages with state READY in new connection object? If so,
5543
the application don't have to take care about any error messages. But for now
5544
it should be ok to let the application hande those kind of problems. (gdi) */
5546
/* we don't have a port to connect to, re open would fail, return now */
5547
if (handle->connect_port <= 0) {
5549
if (cl_com_get_known_endpoint_port(&receiver, &tcp_port) != CL_RETVAL_OK) {
5550
CL_LOG(CL_LOG_ERROR,"no port to connect");
5551
free(unique_hostname); /* don't access receiver after this */
5552
free(receiver.hash_id);
5553
unique_hostname = NULL;
5554
receiver.comp_host = NULL;
5555
/* unlock connection list */
5556
pthread_mutex_unlock(handle->connection_list_mutex);
5557
return CL_RETVAL_NO_PORT_ERROR;
5561
while (still_in_list == 1) {
5562
int message_sent = 0;
5565
/* lock handle connection list */
5566
cl_raw_list_lock(handle->connection_list);
5568
elem = cl_connection_list_get_elem_endpoint(handle->connection_list, &receiver);
5571
connection = elem->connection;
5574
if ( shutdown_received == 1 ) {
5575
if (connection->ccm_received == 0) {
5576
/* This must be a new connection ( initiated from other endpoint ), so
5577
we return that connection is already open */
5578
cl_raw_list_unlock(handle->connection_list);
5579
free(unique_hostname);
5580
free(receiver.hash_id);
5581
unique_hostname = NULL;
5582
receiver.comp_host = NULL;
5584
pthread_mutex_unlock(handle->connection_list_mutex);
5585
CL_LOG(CL_LOG_INFO,"This is a new connected client, don't reopen, use new connection");
5586
return CL_RETVAL_OK;
5589
if ( (connection->connection_state == CL_CONNECTED && connection->connection_sub_state == CL_COM_WORK) ||
5590
connection->connection_state == CL_CONNECTING ||
5591
connection->connection_state == CL_DISCONNECTED) {
5592
/* This must be a new connection ( initiated from other endpoint ), so
5593
we return that connection is already open */
5594
cl_raw_list_unlock(handle->connection_list);
5595
free(unique_hostname);
5596
free(receiver.hash_id);
5597
unique_hostname = NULL;
5598
receiver.comp_host = NULL;
5600
pthread_mutex_unlock(handle->connection_list_mutex);
5601
CL_LOG(CL_LOG_INFO,"This is a new connected client, don't reopen, use new connection");
5602
return CL_RETVAL_OK;
5606
CL_LOG(CL_LOG_WARNING,"connection still alive ...");
5607
CL_LOG_INT(CL_LOG_WARNING,"receive buffer:",(int)cl_raw_list_get_elem_count(connection->received_message_list) );
5608
CL_LOG_INT(CL_LOG_WARNING,"send buffer :",(int)cl_raw_list_get_elem_count(connection->send_message_list) );
5610
if ( connection->ccm_received == 1 &&
5611
cl_raw_list_get_elem_count(connection->received_message_list) == 0 &&
5612
cl_raw_list_get_elem_count(connection->send_message_list) == 0 ) {
5613
CL_LOG(CL_LOG_WARNING,"received ccm");
5614
connection->ccm_received = 2;
5615
connection->connection_sub_state = CL_COM_SENDING_CCRM;
5616
cl_commlib_send_ccrm_message(connection);
5618
CL_LOG(CL_LOG_WARNING,"sending ccrm");
5621
/* There are messages to read for application, return */
5622
if ( cl_raw_list_get_elem_count(connection->received_message_list) != 0 ) {
5623
messages_for_app = CL_TRUE;
5626
gettimeofday(&now,NULL);
5627
if (connection->shutdown_timeout <= now.tv_sec || cl_com_get_ignore_timeouts_flag() == CL_TRUE) {
5628
CL_LOG(CL_LOG_WARNING,"got timeout while waiting for connection close");
5629
connection->connection_state = CL_CLOSING;
5630
connection->connection_sub_state = CL_COM_DO_SHUTDOWN;
5631
cl_raw_list_unlock(handle->connection_list);
5632
free(unique_hostname);
5633
free(receiver.hash_id);
5634
unique_hostname = NULL;
5635
receiver.comp_host = NULL;
5637
/* unlock connection list */
5638
pthread_mutex_unlock(handle->connection_list_mutex);
5639
return CL_RETVAL_CONNECTION_GOING_DOWN;
5642
cl_raw_list_unlock(handle->connection_list);
5644
if (still_in_list == 1) {
5645
switch(cl_com_create_threads) {
5647
CL_LOG(CL_LOG_INFO,"no threads enabled");
5648
cl_commlib_trigger(handle, 1);
5651
/* unlock connection list */
5652
if (message_sent != 0) {
5653
cl_thread_trigger_event(handle->write_thread);
5655
cl_thread_wait_for_thread_condition(handle->read_condition,
5656
handle->select_sec_timeout,
5657
handle->select_usec_timeout);
5660
/* at this point the handle->connection_list must be unlocked */
5663
if ( messages_for_app == CL_TRUE ) {
5664
free(unique_hostname);
5665
free(receiver.hash_id);
5666
unique_hostname = NULL;
5667
receiver.comp_host = NULL;
5669
pthread_mutex_unlock(handle->connection_list_mutex);
5670
return CL_RETVAL_MESSAGE_IN_BUFFER;
5673
CL_LOG(CL_LOG_WARNING,"connection is down! Try to reopen ...");
5677
/* here a new connection setup is done */
5678
CL_LOG(CL_LOG_INFO,"open new connection");
5680
ret_val = cl_com_setup_connection(handle, &new_con);
5681
if (ret_val != CL_RETVAL_OK) {
5682
CL_LOG(CL_LOG_ERROR,"could not setup connection");
5683
cl_com_close_connection(&new_con);
5684
free(unique_hostname);
5685
free(receiver.hash_id);
5686
unique_hostname = NULL;
5687
receiver.comp_host = NULL;
5689
/* unlock connection list */
5690
pthread_mutex_unlock(handle->connection_list_mutex);
5694
/* now open connection to the endpoint */
5695
local_endpoint = cl_com_dup_endpoint(handle->local);
5696
remote_endpoint = cl_com_dup_endpoint(&receiver);
5698
ret_val = cl_com_open_connection(new_con,
5699
handle->open_connection_timeout,
5703
cl_com_free_endpoint(&remote_endpoint);
5704
cl_com_free_endpoint(&local_endpoint);
5706
if (ret_val != CL_RETVAL_OK && ret_val != CL_RETVAL_UNCOMPLETE_WRITE ) {
5707
CL_LOG(CL_LOG_ERROR,"could not open connection");
5708
cl_com_close_connection(&new_con);
5709
free(unique_hostname);
5710
free(receiver.hash_id);
5711
unique_hostname = NULL;
5712
receiver.comp_host = NULL;
5714
/* unlock connection list */
5715
pthread_mutex_unlock(handle->connection_list_mutex);
5717
/* do this for compatibility to remote clients, which
5718
will never immediately will return a connect error */
5719
switch(cl_com_create_threads) {
5721
CL_LOG(CL_LOG_INFO,"no threads enabled");
5722
cl_commlib_trigger(handle, 1);
5725
cl_thread_trigger_event(handle->read_thread);
5726
cl_commlib_trigger(handle, 1); /* TODO: check if this is ok ??? */
5732
new_con->handler = handle;
5735
/* lock connection list */
5736
cl_raw_list_lock(handle->connection_list);
5738
/* Check if this connection is unique */
5739
elem = cl_connection_list_get_elem_endpoint(handle->connection_list, &receiver);
5742
/* endpoint is unique, add it to connection list */
5743
ret_val = cl_connection_list_append_connection(handle->connection_list, new_con, 0);
5744
cl_raw_list_unlock(handle->connection_list);
5746
if ( elem->connection->connection_state != CL_CLOSING ) {
5747
CL_LOG(CL_LOG_INFO,"try to open connection to already connected endpoint");
5749
cl_raw_list_unlock(handle->connection_list);
5750
cl_com_close_connection(&new_con);
5751
free(unique_hostname);
5752
free(receiver.hash_id);
5753
unique_hostname = NULL;
5754
receiver.comp_host = NULL;
5756
/* unlock connection list */
5757
pthread_mutex_unlock(handle->connection_list_mutex);
5758
return CL_RETVAL_OK;
5760
CL_LOG(CL_LOG_ERROR,"client not unique error, can't add opened connection into connection list");
5761
cl_raw_list_unlock(handle->connection_list);
5762
cl_com_close_connection(&new_con);
5763
free(unique_hostname);
5764
free(receiver.hash_id);
5765
unique_hostname = NULL;
5766
receiver.comp_host = NULL;
5768
/* unlock connection list */
5769
pthread_mutex_unlock(handle->connection_list_mutex);
5770
return CL_RETVAL_ENDPOINT_NOT_UNIQUE;
5774
free(unique_hostname);
5775
free(receiver.hash_id);
5776
unique_hostname = NULL;
5777
receiver.comp_host = NULL;
5779
CL_LOG(CL_LOG_INFO,"new connection created");
5780
handle->statistic->new_connections = handle->statistic->new_connections + 1;
5781
/* unlock connection list */
5782
pthread_mutex_unlock(handle->connection_list_mutex);
5784
switch(cl_com_create_threads) {
5786
CL_LOG(CL_LOG_INFO,"no threads enabled");
5787
cl_commlib_trigger(handle, 1);
5790
/* new connection, trigger read_thread */
5791
cl_thread_trigger_event(handle->write_thread);
5792
cl_thread_trigger_event(handle->read_thread);
5800
#ifdef __CL_FUNCTION__
5801
#undef __CL_FUNCTION__
5803
#define __CL_FUNCTION__ "cl_commlib_close_connection()"
5804
int cl_commlib_close_connection(cl_com_handle_t* handle,char* un_resolved_hostname, char* component_name, unsigned long component_id, cl_bool_t return_for_messages) {
5806
int return_value = CL_RETVAL_OK;
5807
cl_bool_t trigger_write = CL_FALSE;
5808
char* unique_hostname = NULL;
5809
struct in_addr in_addr;
5810
cl_com_endpoint_t receiver;
5811
cl_connection_list_elem_t* elem = NULL;
5812
cl_com_connection_t* connection = NULL;
5813
cl_app_message_queue_elem_t* mq_elem = NULL;
5814
int mq_return_value = CL_RETVAL_OK;
5816
cl_commlib_check_callback_functions();
5818
if ( handle == NULL) {
5819
CL_LOG(CL_LOG_ERROR,cl_get_error_text(CL_RETVAL_HANDLE_NOT_FOUND));
5820
return CL_RETVAL_HANDLE_NOT_FOUND;
5822
/* check endpoint parameters: un_resolved_hostname , componenet_name and componenet_id */
5823
if (un_resolved_hostname == NULL || component_name == NULL || component_id == 0) {
5824
CL_LOG(CL_LOG_ERROR,cl_get_error_text(CL_RETVAL_UNKNOWN_ENDPOINT));
5825
return CL_RETVAL_UNKNOWN_ENDPOINT;
5827
/* resolve hostname */
5828
return_value = cl_com_cached_gethostbyname(un_resolved_hostname, &unique_hostname, &in_addr, NULL, NULL);
5829
if (return_value != CL_RETVAL_OK) {
5830
CL_LOG(CL_LOG_ERROR,cl_get_error_text(return_value));
5831
return return_value;
5833
/* setup endpoint */
5834
receiver.comp_host = unique_hostname;
5835
receiver.comp_name = component_name;
5836
receiver.comp_id = component_id;
5837
receiver.addr.s_addr = in_addr.s_addr;
5838
receiver.hash_id = cl_create_endpoint_string(&receiver);
5839
if (receiver.hash_id == NULL) {
5840
free(unique_hostname);
5841
return CL_RETVAL_MALLOC;
5844
/* flush send message queue */
5845
cl_raw_list_lock(handle->send_message_queue);
5846
while((mq_elem = cl_app_message_queue_get_first_elem(handle->send_message_queue)) != NULL) {
5847
CL_LOG(CL_LOG_INFO,"flushing send message queue ...");
5849
mq_return_value = cl_commlib_send_message_to_endpoint(handle, mq_elem->snd_destination,
5850
mq_elem->snd_ack_type, mq_elem->snd_data,
5851
mq_elem->snd_size, mq_elem->snd_response_mid,
5853
/* remove queue entries */
5854
cl_raw_list_remove_elem(handle->send_message_queue, mq_elem->raw_elem);
5855
if (mq_return_value != CL_RETVAL_OK) {
5856
CL_LOG_STR(CL_LOG_ERROR,"can't send message:", cl_get_error_text(mq_return_value));
5857
free(mq_elem->snd_data);
5859
cl_com_free_endpoint(&(mq_elem->snd_destination));
5862
cl_raw_list_unlock(handle->send_message_queue);
5865
/* lock handle connection list */
5866
cl_raw_list_lock(handle->connection_list);
5869
elem = cl_connection_list_get_elem_endpoint(handle->connection_list, &receiver);
5871
connection = elem->connection;
5872
if (connection->data_flow_type == CL_CM_CT_MESSAGE) {
5873
if (connection->connection_state == CL_CONNECTED &&
5874
connection->connection_sub_state == CL_COM_WORK &&
5875
connection->ccm_received == 0) {
5876
cl_commlib_send_ccm_message(connection);
5877
trigger_write = CL_TRUE;
5878
connection->connection_sub_state = CL_COM_SENDING_CCM;
5879
CL_LOG_STR(CL_LOG_WARNING,"closing connection to host:", connection->remote->comp_host );
5880
CL_LOG_STR(CL_LOG_WARNING,"component name: ", connection->remote->comp_name );
5881
CL_LOG_INT(CL_LOG_WARNING,"component id: ", (int)connection->remote->comp_id );
5884
} else if (connection->data_flow_type == CL_CM_CT_STREAM) {
5885
CL_LOG(CL_LOG_WARNING,"closing stream connection");
5886
CL_LOG_STR(CL_LOG_WARNING,"closing connection to host:", connection->remote->comp_host );
5887
CL_LOG_STR(CL_LOG_WARNING,"component name: ", connection->remote->comp_name );
5888
CL_LOG_INT(CL_LOG_WARNING,"component id: ", (int)connection->remote->comp_id );
5889
connection->connection_state = CL_CLOSING;
5890
connection->connection_sub_state = CL_COM_DO_SHUTDOWN;
5895
cl_raw_list_unlock(handle->connection_list);
5897
if ( trigger_write == CL_TRUE ) {
5898
switch(cl_com_create_threads) {
5899
case CL_NO_THREAD: {
5900
CL_LOG(CL_LOG_INFO,"no threads enabled");
5901
/* we just want to trigger write , no wait for read*/
5902
cl_commlib_trigger(handle, 1);
5905
case CL_RW_THREAD: {
5906
/* we just want to trigger write , no wait for read*/
5907
cl_thread_trigger_event(handle->write_thread);
5913
/* Wait for removal of connection */
5914
cl_bool_t connection_removed = CL_FALSE;
5915
cl_bool_t do_return_after_trigger = CL_FALSE;
5917
while ( connection_removed == CL_FALSE ) {
5918
connection_removed = CL_TRUE;
5920
/* is connection still in list ??? */
5921
cl_raw_list_lock(handle->connection_list);
5923
elem = cl_connection_list_get_elem_endpoint(handle->connection_list, &receiver);
5925
cl_message_list_elem_t* message_elem = NULL;
5926
cl_message_list_elem_t* current_message_elem = NULL;
5928
connection = elem->connection;
5929
connection_removed = CL_FALSE;
5931
cl_raw_list_lock(connection->received_message_list);
5932
if (cl_raw_list_get_elem_count(connection->received_message_list) > 0) {
5933
message_elem = cl_message_list_get_first_elem(connection->received_message_list);
5934
while(message_elem) {
5935
/* set current message element */
5936
current_message_elem = message_elem;
5938
/* get next message */
5939
message_elem = cl_message_list_get_next_elem(message_elem);
5941
if (current_message_elem->message->message_state == CL_MS_READY) {
5942
/* there are messages in ready to deliver state for this connection */
5943
if (return_for_messages == CL_TRUE) {
5944
do_return_after_trigger = CL_TRUE;
5946
cl_com_message_t* message = NULL;
5948
/* remove message from received message list*/
5949
message = current_message_elem->message;
5950
cl_message_list_remove_receive(connection, message, 0);
5952
/* decrease counter for ready messages */
5953
pthread_mutex_lock(handle->messages_ready_mutex);
5954
handle->messages_ready_for_read = handle->messages_ready_for_read - 1;
5955
cl_app_message_queue_remove(handle->received_message_queue, connection, 1, CL_FALSE);
5956
pthread_mutex_unlock(handle->messages_ready_mutex);
5958
/* delete message */
5959
cl_com_free_message(&message);
5965
cl_raw_list_unlock(connection->received_message_list);
5967
cl_raw_list_unlock(handle->connection_list);
5969
if ( connection_removed == CL_FALSE ) {
5970
switch(cl_com_create_threads) {
5972
CL_LOG(CL_LOG_INFO,"no threads enabled");
5973
cl_commlib_trigger(handle, 1);
5976
/* we just want to trigger write , no wait for read*/
5977
cl_thread_trigger_event(handle->write_thread);
5981
if (do_return_after_trigger == CL_TRUE) {
5982
free(unique_hostname);
5983
free(receiver.hash_id);
5984
return CL_RETVAL_MESSAGE_IN_BUFFER;
5987
free(unique_hostname);
5988
free(receiver.hash_id);
5989
return CL_RETVAL_OK;
5991
free(unique_hostname);
5992
free(receiver.hash_id);
5993
return CL_RETVAL_CONNECTION_NOT_FOUND;
5996
#ifdef __CL_FUNCTION__
5997
#undef __CL_FUNCTION__
5999
#define __CL_FUNCTION__ "cl_commlib_get_endpoint_status()"
6000
int cl_commlib_get_endpoint_status(cl_com_handle_t* handle, char* un_resolved_hostname,
6001
char* component_name, unsigned long component_id,
6002
cl_com_SIRM_t** status)
6006
cl_com_connection_t* connection = NULL;
6007
cl_connection_list_elem_t* elem = NULL;
6008
int message_added = 0;
6009
int new_message_added = 0;
6010
unsigned long my_mid = 0;
6011
int return_value = CL_RETVAL_OK;
6012
cl_com_endpoint_t receiver;
6013
char* unique_hostname = NULL;
6014
struct in_addr in_addr;
6016
int found_message = 0;
6018
cl_message_list_elem_t* message_list_elem = NULL;
6019
cl_message_list_elem_t* next_message_list_elem = NULL;
6021
cl_com_message_t* message = NULL;
6023
cl_commlib_check_callback_functions();
6025
if ( handle == NULL || status == NULL) {
6026
CL_LOG(CL_LOG_ERROR,cl_get_error_text(CL_RETVAL_HANDLE_NOT_FOUND));
6027
return CL_RETVAL_PARAMS;
6030
/* check endpoint parameters: un_resolved_hostname , componenet_name and componenet_id */
6031
if (un_resolved_hostname == NULL || component_name == NULL || component_id == 0) {
6032
CL_LOG(CL_LOG_ERROR,cl_get_error_text(CL_RETVAL_UNKNOWN_ENDPOINT));
6033
return CL_RETVAL_UNKNOWN_ENDPOINT;
6036
if (*status != NULL) {
6037
CL_LOG(CL_LOG_ERROR,"expected empty status pointer address");
6038
return CL_RETVAL_PARAMS;
6041
CL_LOG_STR_STR_INT(CL_LOG_INFO, "ping",un_resolved_hostname, component_name, (int)component_id);
6043
/* resolve hostname */
6044
return_value = cl_com_cached_gethostbyname(un_resolved_hostname, &unique_hostname, &in_addr, NULL, NULL);
6045
if (return_value != CL_RETVAL_OK) {
6046
CL_LOG(CL_LOG_ERROR,cl_get_error_text(return_value));
6047
return return_value;
6050
/* setup endpoint */
6051
receiver.comp_host = unique_hostname;
6052
receiver.comp_name = component_name;
6053
receiver.comp_id = component_id;
6054
receiver.addr.s_addr = in_addr.s_addr;
6055
receiver.hash_id = cl_create_endpoint_string(&receiver);
6056
if (receiver.hash_id == NULL) {
6057
free(unique_hostname);
6058
return CL_RETVAL_MALLOC;
6061
while(retry_send != 0) {
6063
/* lock handle connection list */
6064
cl_raw_list_lock(handle->connection_list);
6067
elem = cl_connection_list_get_elem_endpoint(handle->connection_list, &receiver);
6069
connection = elem->connection;
6071
if (connection->ccm_received != 0) {
6072
/* we have sent connection close response, do not accept any new message */
6073
/* we wait till connection is down and try to reconnect */
6074
CL_LOG(CL_LOG_ERROR,"connection is going down now, can't send message (ccrm sent)");
6075
} else if (connection->connection_state == CL_CONNECTED && connection->connection_sub_state != CL_COM_WORK) {
6076
CL_LOG(CL_LOG_WARNING,"connection is going down now, can't send message");
6078
return_value = cl_commlib_send_sim_message(connection, &my_mid);
6080
if (return_value != CL_RETVAL_OK) {
6081
cl_raw_list_unlock(handle->connection_list);
6082
free(unique_hostname);
6083
free(receiver.hash_id);
6084
return return_value;
6089
cl_raw_list_unlock(handle->connection_list);
6091
/* if message is not added, the connection was not found -> try to open it */
6092
if (message_added != 1) {
6094
return_value = cl_commlib_open_connection(handle, un_resolved_hostname, component_name, component_id);
6095
if (return_value != CL_RETVAL_OK) {
6096
free(unique_hostname);
6097
free(receiver.hash_id);
6098
CL_LOG_STR(CL_LOG_ERROR,"cl_commlib_open_connection() returned: ",cl_get_error_text(return_value));
6099
return return_value;
6101
if (retry_send >= 3) {
6102
CL_LOG(CL_LOG_ERROR,"can't open connection, don't retry to send this message");
6106
retry_send = 0; /* break */
6111
if (message_added == 1) {
6113
switch(cl_com_create_threads) {
6114
case CL_NO_THREAD: {
6115
CL_LOG(CL_LOG_INFO,"no threads enabled");
6116
/* we just want to trigger write , no wait for read*/
6117
cl_commlib_trigger(handle, 1);
6120
case CL_RW_THREAD: {
6121
/* we just want to trigger write , no wait for read*/
6122
cl_thread_trigger_event(handle->write_thread);
6127
free(unique_hostname);
6128
free(receiver.hash_id);
6129
return CL_RETVAL_SEND_ERROR;
6132
CL_LOG_INT(CL_LOG_WARNING,"waiting for SIRM with id",(int)my_mid);
6133
while(do_stop == 0) {
6135
/* lock handle connection list */
6136
cl_raw_list_lock(handle->connection_list);
6138
elem = cl_connection_list_get_elem_endpoint(handle->connection_list, &receiver);
6141
connection = elem->connection;
6142
/* check for message acknowledge */
6143
cl_raw_list_lock(connection->send_message_list);
6145
message_list_elem = cl_message_list_get_first_elem(connection->send_message_list);
6146
while(message_list_elem != NULL && found_message == 0) {
6147
message = message_list_elem->message;
6148
next_message_list_elem = cl_message_list_get_next_elem(message_list_elem );
6149
if (message->message_id == my_mid) {
6152
if (message->message_sirm != NULL) {
6153
cl_message_list_remove_send(connection, message, 0);
6154
*status = message->message_sirm;
6155
message->message_sirm = NULL;
6156
cl_com_free_message(&message);
6157
cl_raw_list_unlock(connection->send_message_list);
6159
if (connection->ccm_received == 1) {
6160
CL_LOG(CL_LOG_INFO,"received ccm");
6162
CL_LOG_INT(CL_LOG_WARNING,"receive buffer:",(int)cl_raw_list_get_elem_count(connection->received_message_list) );
6163
CL_LOG_INT(CL_LOG_WARNING,"send buffer :",(int)cl_raw_list_get_elem_count(connection->send_message_list) );
6164
CL_LOG_INT(CL_LOG_WARNING,"ccm_received :",(int)connection->ccm_received);
6166
if( cl_raw_list_get_elem_count(connection->send_message_list) == 0 &&
6167
cl_raw_list_get_elem_count(connection->received_message_list) == 0 ) {
6168
connection->ccm_received = 2;
6169
connection->connection_sub_state = CL_COM_SENDING_CCRM;
6170
/* disable #if 1 if you want to test a client, that is frozen! */
6171
cl_commlib_send_ccrm_message(connection);
6172
new_message_added = 1;
6173
CL_LOG(CL_LOG_WARNING,"sending ccrm");
6175
CL_LOG(CL_LOG_WARNING,"can't send ccrm, still messages in buffer");
6180
cl_raw_list_unlock(handle->connection_list);
6181
free(unique_hostname);
6182
free(receiver.hash_id);
6183
CL_LOG_INT(CL_LOG_WARNING,"got SIRM for SIM with id:",(int)my_mid);
6185
if (new_message_added) {
6186
switch(cl_com_create_threads) {
6188
CL_LOG(CL_LOG_INFO,"no threads enabled");
6189
/* we just want to trigger write , no wait for read*/
6190
cl_commlib_trigger(handle, 1);
6193
/* we just want to trigger write , no wait for read*/
6194
cl_thread_trigger_event(handle->write_thread);
6199
return CL_RETVAL_OK;
6201
CL_LOG_INT(CL_LOG_WARNING,"no SRIM for SIM with id", (int)my_mid);
6204
message_list_elem = next_message_list_elem;
6206
cl_raw_list_unlock(connection->send_message_list);
6208
CL_LOG(CL_LOG_ERROR,"no connection FOUND");
6209
cl_raw_list_unlock(handle->connection_list);
6210
free(unique_hostname);
6211
free(receiver.hash_id);
6212
return CL_RETVAL_CONNECTION_NOT_FOUND;
6214
cl_raw_list_unlock(handle->connection_list);
6216
if (found_message == 0) {
6217
CL_LOG_INT(CL_LOG_ERROR,"SIM not found or removed because of SIRM ack timeout", (int)my_mid);
6218
free(unique_hostname);
6219
free(receiver.hash_id);
6220
return CL_RETVAL_MESSAGE_ACK_ERROR; /* message not found or removed because of ack timeout */
6223
switch(cl_com_create_threads) {
6225
CL_LOG(CL_LOG_INFO,"no threads enabled");
6226
cl_commlib_trigger(handle, 1);
6229
cl_thread_wait_for_thread_condition(handle->read_condition,
6230
handle->select_sec_timeout,
6231
handle->select_usec_timeout);
6235
return CL_RETVAL_UNKNOWN;
6239
#ifdef __CL_FUNCTION__
6240
#undef __CL_FUNCTION__
6242
#define __CL_FUNCTION__ "cl_commlib_send_message_to_endpoint()"
6243
static int cl_commlib_send_message_to_endpoint(cl_com_handle_t* handle,
6244
cl_com_endpoint_t* endpoint,
6245
cl_xml_ack_type_t ack_type,
6247
unsigned long size ,
6248
unsigned long response_mid,
6249
unsigned long tag) {
6251
cl_com_connection_t* connection = NULL;
6252
cl_connection_list_elem_t* elem = NULL;
6253
cl_com_message_t* message = NULL;
6254
int message_added = 0;
6255
int return_value = CL_RETVAL_OK;
6258
/* check acknowledge method */
6259
if (ack_type == CL_MIH_MAT_UNDEFINED || data == NULL || size == 0 ) {
6260
CL_LOG(CL_LOG_ERROR,cl_get_error_text(CL_RETVAL_PARAMS));
6261
return CL_RETVAL_PARAMS;
6264
if ( handle == NULL) {
6265
CL_LOG(CL_LOG_ERROR,cl_get_error_text(CL_RETVAL_HANDLE_NOT_FOUND));
6266
return CL_RETVAL_HANDLE_NOT_FOUND;
6269
if ( endpoint == NULL ) {
6270
CL_LOG(CL_LOG_ERROR,cl_get_error_text(CL_RETVAL_UNKNOWN_ENDPOINT));
6271
return CL_RETVAL_UNKNOWN_ENDPOINT;
6274
if ( endpoint->comp_id == 0) {
6275
CL_LOG(CL_LOG_ERROR,cl_get_error_text(CL_RETVAL_UNKNOWN_ENDPOINT));
6276
return CL_RETVAL_UNKNOWN_ENDPOINT;
6279
if ( handle->do_shutdown != 0) {
6280
CL_LOG(CL_LOG_WARNING,"handle is going down, don't send message");
6281
return CL_RETVAL_HANDLE_SHUTDOWN_IN_PROGRESS;
6284
while(retry_send != 0) {
6286
/* lock handle connection list */
6287
cl_raw_list_lock(handle->connection_list);
6288
elem = cl_connection_list_get_elem_endpoint(handle->connection_list, endpoint);
6290
connection = elem->connection;
6292
if (connection->was_accepted == CL_TRUE &&
6293
connection->crm_state != CL_CRM_CS_UNDEFINED &&
6294
connection->crm_state != CL_CRM_CS_CONNECTED) {
6295
CL_LOG(CL_LOG_WARNING,"ignore connection with unexpected connection state");
6297
/* send message to client (no broadcast) */
6298
if (connection->ccm_received != 0 ||
6299
(connection->connection_state == CL_CONNECTED && connection->connection_sub_state != CL_COM_WORK)) {
6300
CL_LOG(CL_LOG_WARNING,"connection is going down now, can't send message");
6302
if (response_mid > 0 && response_mid > connection->last_received_message_id) {
6303
CL_LOG_INT(CL_LOG_DEBUG,"last_received_message_id:", (int)connection->last_received_message_id );
6304
CL_LOG_INT(CL_LOG_DEBUG,"last_send_message_id :", (int)connection->last_send_message_id);
6305
CL_LOG_INT(CL_LOG_DEBUG,"response_mid to send :", (int)response_mid);
6307
CL_LOG(CL_LOG_ERROR,"Protocol error: haven't received such a high message id till now");
6308
cl_raw_list_unlock(handle->connection_list);
6309
return CL_RETVAL_PROTOCOL_ERROR;
6312
CL_LOG_STR_STR_INT(CL_LOG_INFO, "sending it to: ", connection->remote->comp_host,
6313
connection->remote->comp_name,
6314
(int)connection->remote->comp_id);
6316
return_value = cl_com_setup_message(&message, connection, data, size, ack_type,response_mid,tag);
6318
if (return_value != CL_RETVAL_OK) {
6319
cl_raw_list_unlock(handle->connection_list);
6320
return return_value;
6323
return_value = cl_message_list_append_send(connection, message, 1);
6324
if (return_value != CL_RETVAL_OK) {
6325
cl_com_free_message(&message);
6326
cl_raw_list_unlock(handle->connection_list);
6327
return return_value;
6333
cl_raw_list_unlock(handle->connection_list);
6335
/* if message is not added, the connection was not found -> try to open it */
6336
if (message_added != 1) {
6338
return_value = cl_commlib_open_connection(handle, endpoint->comp_host, endpoint->comp_name, endpoint->comp_id );
6339
if (return_value != CL_RETVAL_OK) {
6340
CL_LOG_STR(CL_LOG_ERROR,"cl_commlib_open_connection() returned: ",cl_get_error_text(return_value));
6341
return return_value;
6343
if (retry_send >= 3) {
6344
CL_LOG(CL_LOG_ERROR,"can't open connection, don't retry to send this message");
6348
retry_send = 0; /* break */
6352
if (message_added != 1) {
6353
return CL_RETVAL_SEND_ERROR;
6356
return return_value;
6360
#ifdef __CL_FUNCTION__
6361
#undef __CL_FUNCTION__
6363
#define __CL_FUNCTION__ "cl_commlib_send_message()"
6365
cl_commlib_send_message(cl_com_handle_t* handle, char *un_resolved_hostname,
6366
char *component_name, unsigned long component_id,
6367
cl_xml_ack_type_t ack_type, cl_byte_t **data,
6368
unsigned long size, unsigned long *mid,
6369
unsigned long response_mid, unsigned long tag ,
6370
cl_bool_t copy_data, cl_bool_t wait_for_ack)
6372
cl_com_connection_t* connection = NULL;
6373
cl_connection_list_elem_t* elem = NULL;
6374
cl_com_message_t* message = NULL;
6375
int message_added = 0;
6376
unsigned long my_mid = 0;
6377
int return_value = CL_RETVAL_OK;
6378
cl_com_endpoint_t receiver;
6379
char* unique_hostname = NULL;
6380
struct in_addr in_addr;
6381
cl_byte_t* help_data = NULL;
6384
cl_commlib_check_callback_functions();
6386
/* check acknowledge method */
6387
if (ack_type == CL_MIH_MAT_UNDEFINED || data == NULL || *data == NULL || size == 0 ) {
6388
CL_LOG(CL_LOG_ERROR,cl_get_error_text(CL_RETVAL_PARAMS));
6389
return CL_RETVAL_PARAMS;
6392
if ( handle == NULL) {
6393
CL_LOG(CL_LOG_ERROR,cl_get_error_text(CL_RETVAL_HANDLE_NOT_FOUND));
6394
return CL_RETVAL_HANDLE_NOT_FOUND;
6397
/* check endpoint parameters: un_resolved_hostname , componenet_name and componenet_id */
6398
if (un_resolved_hostname == NULL || component_name == NULL || component_id == 0) {
6399
CL_LOG(CL_LOG_ERROR,cl_get_error_text(CL_RETVAL_UNKNOWN_ENDPOINT));
6400
return CL_RETVAL_UNKNOWN_ENDPOINT;
6403
/* make a copy of the message data (if wished) */
6404
if (copy_data == CL_TRUE) {
6405
help_data = (cl_byte_t*)malloc((sizeof(cl_byte_t)*size));
6406
if (help_data == NULL) {
6407
return CL_RETVAL_MALLOC;
6409
memcpy(help_data, *data, (sizeof(cl_byte_t)*size));
6417
* The send_message_queue can only be used if the following parameters are not requested
6420
* - mid != NULL : The caller wants the message id which can't be set before
6421
* the message is added to the connection's message list
6423
* - wait_for_ack == CL_TRUE : The caller wants to wait for the response, we have to do
6424
* the transaction at once
6429
if (mid == NULL && wait_for_ack == CL_FALSE && cl_com_create_threads != CL_NO_THREAD ) {
6430
cl_com_endpoint_t* destination_endpoint = NULL;
6432
/* using send_message_queue for this message */
6433
CL_LOG_STR_STR_INT(CL_LOG_INFO, "add message into send queue for: ", un_resolved_hostname, component_name, (int)component_id);
6435
/* resolve hostname and create endpoint structure */
6436
return_value = cl_com_cached_gethostbyname(un_resolved_hostname, &unique_hostname, &in_addr, NULL, NULL);
6437
if (return_value != CL_RETVAL_OK) {
6438
CL_LOG(CL_LOG_ERROR,cl_get_error_text(return_value));
6439
if (copy_data == CL_TRUE) {
6442
return return_value;
6445
destination_endpoint = cl_com_create_endpoint(unique_hostname, component_name, component_id, &in_addr);
6446
free(unique_hostname);
6447
unique_hostname = NULL;
6449
if (destination_endpoint == NULL) {
6450
if (copy_data == CL_TRUE) {
6453
return CL_RETVAL_MALLOC;
6456
return_value = cl_app_message_queue_append(handle->send_message_queue, NULL,
6457
destination_endpoint, ack_type,
6458
help_data, size, response_mid, tag, 1);
6459
if (return_value != CL_RETVAL_OK) {
6460
CL_LOG(CL_LOG_ERROR,cl_get_error_text(return_value));
6461
if (copy_data == CL_TRUE) {
6464
return return_value;
6467
cl_thread_trigger_event(handle->write_thread);
6471
CL_LOG_STR_STR_INT(CL_LOG_INFO, "add new message for: ", un_resolved_hostname, component_name, (int)component_id);
6473
/* resolve hostname */
6474
return_value = cl_com_cached_gethostbyname(un_resolved_hostname, &unique_hostname, &in_addr, NULL, NULL);
6475
if (return_value != CL_RETVAL_OK) {
6476
CL_LOG(CL_LOG_ERROR,cl_get_error_text(return_value));
6477
if (copy_data == CL_TRUE) {
6480
return return_value;
6484
/* setup endpoint */
6485
receiver.comp_host = unique_hostname;
6486
receiver.comp_name = component_name;
6487
receiver.comp_id = component_id;
6488
receiver.addr.s_addr = in_addr.s_addr;
6489
receiver.hash_id = cl_create_endpoint_string(&receiver);
6490
if (receiver.hash_id == NULL) {
6491
free(unique_hostname);
6492
if (copy_data == CL_TRUE) {
6495
return CL_RETVAL_MALLOC;
6498
while(retry_send != 0) {
6500
/* lock handle connection list */
6501
cl_raw_list_lock(handle->connection_list);
6503
elem = cl_connection_list_get_elem_endpoint(handle->connection_list, &receiver);
6505
connection = elem->connection;
6507
if (connection->was_accepted == CL_TRUE &&
6508
connection->crm_state != CL_CRM_CS_UNDEFINED &&
6509
connection->crm_state != CL_CRM_CS_CONNECTED) {
6510
CL_LOG(CL_LOG_WARNING,"ignore connection with unexpected connection state");
6512
/* send message to client (no broadcast) */
6514
if (connection->ccm_received != 0 || (
6515
connection->connection_state == CL_CONNECTED && connection->connection_sub_state != CL_COM_WORK)) {
6516
CL_LOG(CL_LOG_WARNING,"connection is going down now, can't send message");
6517
} else if (response_mid > 0 && response_mid > connection->last_received_message_id ) {
6518
CL_LOG_INT(CL_LOG_DEBUG,"last_received_message_id:", (int)connection->last_received_message_id );
6519
CL_LOG_INT(CL_LOG_DEBUG,"last_send_message_id :", (int)connection->last_send_message_id);
6520
CL_LOG_INT(CL_LOG_DEBUG,"response_mid to send :", (int)response_mid);
6522
CL_LOG(CL_LOG_ERROR,"Protocol error: haven't received such a high message id till now");
6523
cl_raw_list_unlock(handle->connection_list);
6524
free(unique_hostname);
6525
free(receiver.hash_id);
6526
if (copy_data == CL_TRUE) {
6529
return CL_RETVAL_PROTOCOL_ERROR;
6532
CL_LOG_STR_STR_INT(CL_LOG_INFO, "sending it to: ", connection->remote->comp_host,
6533
connection->remote->comp_name,
6534
(int)connection->remote->comp_id);
6536
return_value = cl_com_setup_message(&message, connection, help_data, size, ack_type,response_mid,tag);
6538
if (return_value != CL_RETVAL_OK) {
6539
cl_raw_list_unlock(handle->connection_list);
6540
free(unique_hostname);
6541
free(receiver.hash_id);
6542
if (copy_data == CL_TRUE) {
6545
return return_value;
6548
my_mid = message->message_id;
6550
*mid = message->message_id;
6552
return_value = cl_message_list_append_send(connection, message, 1);
6553
if (return_value != CL_RETVAL_OK) {
6554
cl_com_free_message(&message);
6555
cl_raw_list_unlock(handle->connection_list);
6556
free(unique_hostname);
6557
free(receiver.hash_id);
6558
return return_value;
6564
cl_raw_list_unlock(handle->connection_list);
6566
/* if message is not added, the connection was not found -> try to open it */
6567
if (message_added != 1) {
6569
return_value = cl_commlib_open_connection(handle, un_resolved_hostname, component_name, component_id);
6570
if (return_value != CL_RETVAL_OK) {
6571
free(unique_hostname);
6572
free(receiver.hash_id);
6573
CL_LOG_STR(CL_LOG_ERROR,"cl_commlib_open_connection() returned: ", cl_get_error_text(return_value));
6574
if (copy_data == CL_TRUE) {
6577
return return_value;
6579
if (retry_send >= 3) {
6580
CL_LOG(CL_LOG_ERROR,"can't open connection, don't retry to send this message");
6584
retry_send = 0; /* break */
6588
if (message_added == 1) {
6589
switch(cl_com_create_threads) {
6591
CL_LOG(CL_LOG_INFO,"no threads enabled");
6592
/* we just want to trigger write , no wait for read*/
6593
cl_commlib_trigger(handle, 1);
6596
/* we just want to trigger write , no wait for read*/
6597
cl_thread_trigger_event(handle->write_thread);
6601
free(unique_hostname);
6602
free(receiver.hash_id);
6603
if (copy_data == CL_TRUE) {
6606
return CL_RETVAL_SEND_ERROR;
6609
if (ack_type == CL_MIH_MAT_NAK) {
6610
free(unique_hostname);
6611
free(receiver.hash_id);
6612
return CL_RETVAL_OK;
6615
if (wait_for_ack == CL_FALSE) {
6616
free(unique_hostname);
6617
free(receiver.hash_id);
6618
return CL_RETVAL_OK;
6621
CL_LOG_INT(CL_LOG_INFO,"message acknowledge expected, waiting for ack", (int)my_mid);
6622
return_value = cl_commlib_check_for_ack(handle, receiver.comp_host, component_name, component_id, my_mid, CL_TRUE);
6623
free(unique_hostname);
6624
free(receiver.hash_id);
6627
return return_value;
6636
#ifdef __CL_FUNCTION__
6637
#undef __CL_FUNCTION__
6639
#define __CL_FUNCTION__ "cl_commlib_get_last_message_time()"
6640
int cl_commlib_get_last_message_time(cl_com_handle_t* handle,
6641
const char* un_resolved_hostname, const char* component_name, unsigned long component_id,
6642
unsigned long* message_time) {
6644
char* unique_hostname = NULL;
6645
struct in_addr in_addr;
6647
cl_com_endpoint_t receiver;
6649
/* set time to 0 if endpoint not found, otherwise return last communication time */
6650
/* otherwise return error */
6655
if (handle == NULL || un_resolved_hostname == NULL || component_name == NULL) {
6656
return CL_RETVAL_PARAMS;
6659
if (component_id <= 0) {
6660
CL_LOG(CL_LOG_ERROR,"component id 0 is not allowed");
6661
return CL_RETVAL_PARAMS;
6664
/* resolve hostname */
6665
return_value = cl_com_cached_gethostbyname(un_resolved_hostname, &unique_hostname, &in_addr, NULL, NULL);
6666
if (return_value != CL_RETVAL_OK) {
6667
return return_value;
6670
/* setup endpoint */
6671
receiver.comp_host = unique_hostname;
6672
receiver.comp_name = (char *)component_name;
6673
receiver.comp_id = component_id;
6674
receiver.addr.s_addr = in_addr.s_addr;
6675
receiver.hash_id = cl_create_endpoint_string(&receiver);
6676
if (receiver.hash_id == NULL) {
6677
free(unique_hostname);
6678
return CL_RETVAL_MALLOC;
6681
return_value = cl_endpoint_list_get_last_touch_time(cl_com_get_endpoint_list(), &receiver, message_time);
6683
CL_LOG_STR(CL_LOG_DEBUG,"host :", receiver.comp_host);
6684
CL_LOG_STR(CL_LOG_DEBUG,"component :", receiver.comp_name);
6685
CL_LOG_INT(CL_LOG_DEBUG,"last transfer time:", (int)*message_time);
6688
free(unique_hostname);
6689
free(receiver.hash_id);
6691
return return_value;
6695
#ifdef __CL_FUNCTION__
6696
#undef __CL_FUNCTION__
6698
#define __CL_FUNCTION__ "cl_com_trigger_thread()"
6699
/* This thread is used for commlib specific calls (e.g. hostlist refresh) */
6700
static void *cl_com_trigger_thread(void *t_conf) {
6701
int ret_val = CL_RETVAL_OK;
6703
/* get pointer to cl_thread_settings_t struct */
6704
cl_thread_settings_t *thread_config = (cl_thread_settings_t*)t_conf;
6707
/* set thread config data */
6708
if (cl_thread_set_thread_config(thread_config) != CL_RETVAL_OK) {
6709
CL_LOG(CL_LOG_ERROR,"thread setup error");
6714
CL_LOG(CL_LOG_INFO, "starting initialization ...");
6716
ret_val = CL_RETVAL_OK;
6717
if (ret_val != CL_RETVAL_OK) {
6721
/* thread init done, trigger startup conditon variable*/
6722
cl_thread_func_startup(thread_config);
6723
CL_LOG(CL_LOG_INFO, "starting main loop ...");
6725
/* ok, thread main */
6726
while (do_exit == 0) {
6727
cl_thread_func_testcancel(thread_config);
6729
CL_LOG(CL_LOG_INFO,"trigger host list refresh ...");
6730
cl_com_host_list_refresh(cl_com_get_host_list());
6731
cl_com_endpoint_list_refresh(cl_com_get_endpoint_list());
6733
CL_LOG(CL_LOG_INFO,"wait for event ...");
6734
if ((ret_val = cl_thread_wait_for_event(thread_config,1,0 )) != CL_RETVAL_OK) { /* nothing to do */
6736
case CL_RETVAL_CONDITION_WAIT_TIMEOUT:
6737
CL_LOG(CL_LOG_INFO,"condition wait timeout");
6740
CL_LOG_STR( CL_LOG_INFO, ">got error<: ", cl_get_error_text(ret_val));
6746
CL_LOG(CL_LOG_INFO, "exiting ...");
6747
/* at least set exit state */
6748
cl_thread_func_cleanup(thread_config);
6753
#ifdef __CL_FUNCTION__
6754
#undef __CL_FUNCTION__
6756
#define __CL_FUNCTION__ "cl_com_handle_service_thread()"
6757
static void *cl_com_handle_service_thread(void *t_conf) {
6758
int ret_val = CL_RETVAL_OK;
6760
cl_com_handle_t* handle = NULL;
6763
/* get pointer to cl_thread_settings_t struct */
6764
cl_thread_settings_t *thread_config = (cl_thread_settings_t*)t_conf;
6766
/* set thread config data */
6767
if (cl_thread_set_thread_config(thread_config) != CL_RETVAL_OK) {
6768
CL_LOG(CL_LOG_ERROR,"thread setup error");
6773
CL_LOG(CL_LOG_INFO, "starting initialization ...");
6775
/* get handle from thread_config */
6776
handle = (cl_com_handle_t*) thread_config->thread_user_data;
6778
/* thread init done, trigger startup conditon variable*/
6779
cl_thread_func_startup(thread_config);
6780
CL_LOG(CL_LOG_INFO, "starting main loop ...");
6782
/* ok, thread main */
6783
while (do_exit == 0) {
6784
cl_thread_func_testcancel(thread_config);
6786
#ifdef CL_DO_COMMLIB_DEBUG
6789
cl_thread_list_elem_t* elem = NULL;
6791
gettimeofday(&now,NULL);
6793
cl_raw_list_lock(cl_com_thread_list);
6794
elem = cl_thread_list_get_first_elem(cl_com_thread_list);
6796
if (elem->thread_config->thread_last_cancel_test_time.tv_sec + 15 < now.tv_sec ) {
6797
CL_LOG_STR(CL_LOG_ERROR,"POSSIBLE DEADLOCK DETECTED (thread_list) => ",elem->thread_config->thread_name);
6799
elem = cl_thread_list_get_next_elem(elem);
6801
cl_raw_list_unlock(cl_com_thread_list);
6803
#endif /* CL_DO_COMMLIB_DEBUG */
6805
/* we don't want to force statistics update every one second */
6806
cl_commlib_calculate_statistic(handle, CL_FALSE, 1);
6807
/* ceck for debug clients */
6808
cl_commlib_handle_debug_clients(handle, CL_TRUE);
6809
/* do received message queue cleanup every second */
6810
cl_commlib_app_message_queue_cleanup(handle);
6812
/* there is nothing to do, wait for events */
6813
CL_LOG(CL_LOG_INFO,"wait for event ...");
6814
if ((ret_val = cl_thread_wait_for_event(thread_config,handle->select_sec_timeout, handle->select_usec_timeout)) != CL_RETVAL_OK) {
6816
case CL_RETVAL_CONDITION_WAIT_TIMEOUT:
6817
CL_LOG(CL_LOG_INFO,"condition wait timeout");
6820
CL_LOG_STR( CL_LOG_INFO, ">got error<: ", cl_get_error_text(ret_val));
6824
cl_thread_clear_events(thread_config);
6827
CL_LOG(CL_LOG_INFO, "exiting ...");
6829
/* at least set exit state */
6830
cl_thread_func_cleanup(thread_config);
6837
#ifdef __CL_FUNCTION__
6838
#undef __CL_FUNCTION__
6840
#define __CL_FUNCTION__ "cl_com_handle_read_thread()"
6841
static void *cl_com_handle_read_thread(void *t_conf) {
6842
int ret_val = CL_RETVAL_OK;
6845
int wait_for_events = 1;
6846
int select_sec_timeout = 0;
6847
int select_usec_timeout = 100*1000;
6848
cl_app_message_queue_elem_t* mq_elem = NULL;
6849
int mq_return_value = CL_RETVAL_OK;
6852
int message_received = 0;
6853
int trigger_write_thread = 0;
6854
cl_connection_list_elem_t* elem = NULL;
6855
char tmp_string[1024];
6858
cl_handle_list_elem_t* handle_elem = NULL;
6859
cl_com_handle_t* handle = NULL;
6861
/* get pointer to cl_thread_settings_t struct */
6862
cl_thread_settings_t *thread_config = (cl_thread_settings_t*)t_conf;
6865
/* set thread config data */
6866
if (cl_thread_set_thread_config(thread_config) != CL_RETVAL_OK) {
6867
CL_LOG(CL_LOG_ERROR,"thread setup error");
6872
CL_LOG(CL_LOG_INFO, "starting initialization ...");
6874
/* thread init done, trigger startup conditon variable*/
6875
cl_thread_func_startup(thread_config);
6876
CL_LOG(CL_LOG_INFO, "starting main loop ...");
6878
/* ok, thread main */
6879
while (do_exit == 0) {
6880
wait_for_events = 1;
6881
trigger_write_thread = 0;
6882
message_received = 0;
6885
cl_thread_func_testcancel(thread_config);
6887
if (handle == NULL) {
6888
cl_raw_list_lock(cl_com_handle_list);
6889
handle_elem = cl_handle_list_get_first_elem(cl_com_handle_list);
6890
while(handle_elem && handle == NULL ) {
6891
if (handle_elem->handle->read_thread == thread_config) {
6892
handle = handle_elem->handle;
6893
select_sec_timeout = handle->select_sec_timeout;
6894
select_usec_timeout = handle->select_usec_timeout;
6895
wait_for_events = 0;
6897
handle_elem = cl_handle_list_get_next_elem(handle_elem);
6899
cl_raw_list_unlock(cl_com_handle_list);
6904
/* check number of connections */
6905
cl_commlib_check_connection_count(handle);
6907
cl_connection_list_destroy_connections_to_close(handle);
6909
cl_raw_list_lock(handle->send_message_queue);
6910
while((mq_elem = cl_app_message_queue_get_first_elem(handle->send_message_queue)) != NULL) {
6911
mq_return_value = cl_commlib_send_message_to_endpoint(handle, mq_elem->snd_destination,
6912
mq_elem->snd_ack_type, mq_elem->snd_data,
6913
mq_elem->snd_size, mq_elem->snd_response_mid,
6915
/* remove queue entries */
6916
cl_raw_list_remove_elem(handle->send_message_queue, mq_elem->raw_elem);
6917
if (mq_return_value != CL_RETVAL_OK) {
6918
CL_LOG_STR(CL_LOG_ERROR,"can't send message:", cl_get_error_text(mq_return_value));
6919
free(mq_elem->snd_data);
6921
cl_com_free_endpoint(&(mq_elem->snd_destination));
6924
cl_raw_list_unlock(handle->send_message_queue);
6927
ret_val = cl_com_open_connection_request_handler(handle, handle->select_sec_timeout , handle->select_usec_timeout, CL_R_SELECT);
6930
case CL_RETVAL_SELECT_TIMEOUT:
6931
CL_LOG(CL_LOG_INFO,"got select timeout");
6932
wait_for_events = 0;
6934
case CL_RETVAL_SELECT_INTERRUPT:
6935
CL_LOG(CL_LOG_WARNING,"got select interrupt");
6936
wait_for_events = 0;
6938
case CL_RETVAL_NO_SELECT_DESCRIPTORS:
6939
CL_LOG(CL_LOG_INFO,"no select descriptors");
6940
wait_for_events = 1;
6943
/* do not wait for events, because we do select */
6944
wait_for_events = 0;
6947
CL_LOG_STR(CL_LOG_ERROR,"got error:",cl_get_error_text(ret_val));
6952
cl_raw_list_lock(handle->connection_list);
6954
elem = cl_connection_list_get_first_elem(handle->connection_list);
6955
gettimeofday(&now,NULL);
6958
switch(elem->connection->connection_state) {
6960
case CL_DISCONNECTED: {
6961
/* open connection if there are messages to send */
6962
if ( cl_raw_list_get_elem_count(elem->connection->send_message_list) > 0) {
6963
CL_LOG(CL_LOG_INFO,"setting connection state to CL_OPENING");
6964
elem->connection->connection_state = CL_OPENING;
6970
/* trigger connect */
6971
if (elem->connection->data_read_flag == CL_COM_DATA_READY) {
6972
return_value = cl_com_open_connection(elem->connection, handle->open_connection_timeout, NULL, NULL);
6973
if (return_value != CL_RETVAL_OK && return_value != CL_RETVAL_UNCOMPLETE_WRITE ) {
6974
CL_LOG_STR(CL_LOG_ERROR,"could not open connection:",cl_get_error_text(return_value));
6975
elem->connection->connection_state = CL_CLOSING;
6976
elem->connection->connection_sub_state = CL_COM_DO_SHUTDOWN;
6978
if (return_value != CL_RETVAL_OK && cl_com_get_ignore_timeouts_flag() == CL_TRUE) {
6979
CL_LOG(CL_LOG_WARNING,"setting connection state to closing");
6980
elem->connection->connection_state = CL_CLOSING;
6981
elem->connection->connection_sub_state = CL_COM_DO_SHUTDOWN;
6984
/* check timeouts */
6985
if ( elem->connection->read_buffer_timeout_time != 0) {
6986
if ( now.tv_sec >= elem->connection->read_buffer_timeout_time || cl_com_get_ignore_timeouts_flag() == CL_TRUE) {
6987
CL_LOG(CL_LOG_ERROR,"read timeout for connection opening");
6988
elem->connection->connection_state = CL_CLOSING;
6989
elem->connection->connection_sub_state = CL_COM_DO_SHUTDOWN;
6992
if ( elem->connection->write_buffer_timeout_time != 0) {
6993
if ( now.tv_sec >= elem->connection->write_buffer_timeout_time || cl_com_get_ignore_timeouts_flag() == CL_TRUE ) {
6994
CL_LOG(CL_LOG_ERROR,"write timeout for connection opening");
6995
elem->connection->connection_state = CL_CLOSING;
6996
elem->connection->connection_sub_state = CL_COM_DO_SHUTDOWN;
7003
case CL_ACCEPTING: {
7005
if (elem->connection->data_read_flag == CL_COM_DATA_READY ) {
7006
return_value = cl_com_connection_complete_accept(elem->connection,handle->open_connection_timeout);
7007
if (return_value != CL_RETVAL_OK) {
7008
if (return_value != CL_RETVAL_UNCOMPLETE_READ &&
7009
return_value != CL_RETVAL_UNCOMPLETE_WRITE &&
7010
return_value != CL_RETVAL_SELECT_ERROR ) {
7011
CL_LOG_STR(CL_LOG_ERROR,"connection accept error:",cl_get_error_text(return_value));
7012
elem->connection->connection_state = CL_CLOSING;
7013
elem->connection->connection_sub_state = CL_COM_DO_SHUTDOWN;
7014
} else if (cl_com_get_ignore_timeouts_flag() == CL_TRUE) {
7015
elem->connection->connection_state = CL_CLOSING;
7016
elem->connection->connection_sub_state = CL_COM_DO_SHUTDOWN;
7019
elem->connection->connection_state = CL_CONNECTING;
7020
elem->connection->connection_sub_state = CL_COM_READ_INIT;
7021
elem->connection->data_read_flag = CL_COM_DATA_NOT_READY;
7024
/* check timeouts */
7025
if ( elem->connection->read_buffer_timeout_time != 0) {
7026
if ( now.tv_sec >= elem->connection->read_buffer_timeout_time ) {
7027
CL_LOG(CL_LOG_ERROR,"accept timeout for connection");
7028
elem->connection->connection_state = CL_CLOSING;
7029
elem->connection->connection_sub_state = CL_COM_DO_SHUTDOWN;
7032
if ( elem->connection->write_buffer_timeout_time != 0) {
7033
if ( now.tv_sec >= elem->connection->write_buffer_timeout_time ) {
7034
CL_LOG(CL_LOG_ERROR,"accept timeout for connection");
7035
elem->connection->connection_state = CL_CLOSING;
7036
elem->connection->connection_sub_state = CL_COM_DO_SHUTDOWN;
7043
case CL_CONNECTING: {
7044
if (elem->connection->data_read_flag == CL_COM_DATA_READY) {
7045
return_value = cl_com_connection_complete_request(handle->connection_list, elem, handle->open_connection_timeout, CL_R_SELECT);
7047
if (return_value != CL_RETVAL_OK) {
7048
if (return_value != CL_RETVAL_UNCOMPLETE_READ &&
7049
return_value != CL_RETVAL_UNCOMPLETE_WRITE &&
7050
return_value != CL_RETVAL_SELECT_ERROR) {
7051
CL_LOG_STR(CL_LOG_ERROR,"connection establish error:", cl_get_error_text(return_value));
7052
elem->connection->connection_state = CL_CLOSING;
7053
elem->connection->connection_sub_state = CL_COM_DO_SHUTDOWN;
7054
} else if (cl_com_get_ignore_timeouts_flag() == CL_TRUE) {
7055
elem->connection->connection_state = CL_CLOSING;
7056
elem->connection->connection_sub_state = CL_COM_DO_SHUTDOWN;
7059
if (elem->connection->connection_state == CL_CONNECTED ) {
7060
cl_commlib_finish_request_completeness(elem->connection);
7061
/* connection is now in connect state, do select before next reading */
7062
elem->connection->data_read_flag = CL_COM_DATA_NOT_READY;
7065
/* check timeouts */
7066
if (elem->connection->read_buffer_timeout_time != 0) {
7067
if ( now.tv_sec >= elem->connection->read_buffer_timeout_time || cl_com_get_ignore_timeouts_flag() == CL_TRUE) {
7068
CL_LOG(CL_LOG_ERROR,"read timeout for connection completion");
7069
elem->connection->connection_state = CL_CLOSING;
7070
elem->connection->connection_sub_state = CL_COM_DO_SHUTDOWN;
7073
if ( elem->connection->write_buffer_timeout_time != 0) {
7074
if ( now.tv_sec >= elem->connection->write_buffer_timeout_time || cl_com_get_ignore_timeouts_flag() == CL_TRUE) {
7075
CL_LOG(CL_LOG_ERROR,"write timeout for connection completion");
7076
elem->connection->connection_state = CL_CLOSING;
7077
elem->connection->connection_sub_state = CL_COM_DO_SHUTDOWN;
7084
case CL_CONNECTED: {
7086
/* check ack timeouts */
7087
return_value = cl_commlib_handle_connection_ack_timeouts(elem->connection);
7089
if (elem->connection->data_read_flag == CL_COM_DATA_READY &&
7090
elem->connection->ccrm_sent == 0 &&
7091
elem->connection->ccrm_received == 0) {
7092
/* TODO: use read thread pool */
7093
return_value = cl_commlib_handle_connection_read(elem->connection);
7094
if (return_value != CL_RETVAL_OK) {
7095
if (return_value != CL_RETVAL_UNCOMPLETE_READ &&
7096
return_value != CL_RETVAL_SELECT_ERROR ) {
7097
elem->connection->connection_state = CL_CLOSING;
7098
elem->connection->connection_sub_state = CL_COM_DO_SHUTDOWN;
7099
CL_LOG_STR(CL_LOG_ERROR,"read from connection: setting close flag! Reason:", cl_get_error_text(return_value));
7100
snprintf(tmp_string, 1024, MSG_CL_COMMLIB_CLOSING_SSU,
7101
elem->connection->remote->comp_host,
7102
elem->connection->remote->comp_name,
7103
sge_u32c(elem->connection->remote->comp_id));
7104
cl_commlib_push_application_error(CL_LOG_ERROR, return_value, tmp_string);
7105
} else if (cl_com_get_ignore_timeouts_flag() == CL_TRUE) {
7106
elem->connection->connection_state = CL_CLOSING;
7107
elem->connection->connection_sub_state = CL_COM_DO_SHUTDOWN;
7110
message_received = 1;
7112
/* check timeouts */
7113
if ( elem->connection->read_buffer_timeout_time != 0) {
7114
if ( now.tv_sec >= elem->connection->read_buffer_timeout_time ) {
7115
CL_LOG(CL_LOG_ERROR,"connection read timeout");
7116
elem->connection->connection_state = CL_CLOSING;
7117
elem->connection->connection_sub_state = CL_COM_DO_SHUTDOWN;
7120
if ( elem->connection->ccrm_received != 0 ) {
7121
CL_LOG(CL_LOG_WARNING, "will not read from this connection, because ccrm was received!");
7125
if (elem->connection->ccm_received == 1 ) {
7126
if( cl_raw_list_get_elem_count(elem->connection->send_message_list) == 0 &&
7127
cl_raw_list_get_elem_count(elem->connection->received_message_list) == 0) {
7128
elem->connection->ccm_received = 2;
7129
elem->connection->connection_sub_state = CL_COM_SENDING_CCRM;
7130
cl_commlib_send_ccrm_message(elem->connection);
7132
CL_LOG_INT(CL_LOG_INFO,"receive buffer:",(int)cl_raw_list_get_elem_count(elem->connection->received_message_list) );
7133
CL_LOG_INT(CL_LOG_INFO,"send buffer :",(int)cl_raw_list_get_elem_count(elem->connection->send_message_list) );
7141
} /* end of switch */
7143
if (elem->connection->data_write_flag == CL_COM_DATA_READY) {
7144
/* there is data to write, trigger write thread */
7145
trigger_write_thread = 1;
7148
elem = cl_connection_list_get_next_elem(elem);
7150
cl_raw_list_unlock(handle->connection_list);
7152
/* check for new connections */
7153
if (handle->service_provider == CL_TRUE &&
7154
handle->service_handler->data_read_flag == CL_COM_DATA_READY) {
7156
/* we have a new connection request */
7157
cl_com_connection_t* new_con = NULL;
7158
cl_com_connection_request_handler(handle->service_handler, &new_con);
7159
if (new_con != NULL) {
7160
/* got new connection request */
7161
new_con->handler = handle->service_handler->handler;
7162
CL_LOG(CL_LOG_INFO,"adding new client");
7163
gettimeofday(&now,NULL);
7164
new_con->read_buffer_timeout_time = now.tv_sec + handle->open_connection_timeout;
7165
cl_connection_list_append_connection(handle->connection_list, new_con, 1);
7166
handle->statistic->new_connections = handle->statistic->new_connections + 1;
7172
if (trigger_write_thread != 0) {
7173
cl_thread_trigger_event(handle->write_thread);
7176
/* trigger threads which are waiting for read_condition when
7177
we will do a wait by itself (because we have no descriptors
7178
for reading) or when we have received a message */
7179
if ( wait_for_events || message_received != 0 ) {
7180
cl_thread_trigger_thread_condition(handle->read_condition,1);
7182
/* if we have received a message which was no protocol message
7183
trigger application ( cl_commlib_receive_message() ) */
7188
/* TODO: never wait for events here, use dummy pipe to wakeup read_select when
7189
a new connection is added, use select() to sleep */
7191
if (wait_for_events != 0) {
7192
if ((ret_val = cl_thread_wait_for_event(thread_config,select_sec_timeout,select_usec_timeout )) != CL_RETVAL_OK) {
7194
case CL_RETVAL_CONDITION_WAIT_TIMEOUT:
7195
CL_LOG(CL_LOG_INFO,"READ THREAD GOT CONDITION WAIT TIMEOUT");
7198
CL_LOG_STR( CL_LOG_INFO, ">got error<: ", cl_get_error_text(ret_val));
7202
/* cleanup all trigger events */
7203
cl_thread_clear_events(thread_config);
7207
CL_LOG(CL_LOG_INFO, "exiting ...");
7209
/* at least set exit state */
7210
cl_thread_func_cleanup(thread_config);
7219
#ifdef __CL_FUNCTION__
7220
#undef __CL_FUNCTION__
7222
#define __CL_FUNCTION__ "cl_com_handle_write_thread()"
7223
static void *cl_com_handle_write_thread(void *t_conf) {
7224
int return_value = CL_RETVAL_OK;
7225
int ret_val = CL_RETVAL_OK;
7227
int wait_for_events = 1;
7228
int select_sec_timeout = 0;
7229
int select_usec_timeout = 100*1000;
7230
cl_connection_list_elem_t* elem = NULL;
7232
cl_handle_list_elem_t* handle_elem = NULL;
7233
cl_com_handle_t* handle = NULL;
7234
int trigger_read_thread = 0;
7235
char tmp_string[1024];
7236
cl_app_message_queue_elem_t* mq_elem = NULL;
7237
int mq_return_value = CL_RETVAL_OK;
7239
/* get pointer to cl_thread_settings_t struct */
7240
cl_thread_settings_t *thread_config = (cl_thread_settings_t*)t_conf;
7243
/* set thread config data */
7244
if (cl_thread_set_thread_config(thread_config) != CL_RETVAL_OK) {
7245
CL_LOG(CL_LOG_ERROR,"thread setup error");
7250
CL_LOG(CL_LOG_INFO, "starting initialization ...");
7252
/* thread init done, trigger startup conditon variable*/
7253
cl_thread_func_startup(thread_config);
7254
CL_LOG(CL_LOG_INFO, "starting main loop ...");
7256
/* ok, thread main */
7257
while (do_exit == 0) {
7258
trigger_read_thread = 0;
7259
cl_thread_func_testcancel(thread_config);
7261
if (handle == NULL) {
7262
cl_raw_list_lock(cl_com_handle_list);
7263
handle_elem = cl_handle_list_get_first_elem(cl_com_handle_list);
7264
while(handle_elem && handle == NULL ) {
7265
if (handle_elem->handle->write_thread == thread_config) {
7266
handle = handle_elem->handle;
7267
select_sec_timeout = handle->select_sec_timeout;
7268
select_usec_timeout = handle->select_usec_timeout;
7269
wait_for_events = 0;
7271
handle_elem = cl_handle_list_get_next_elem(handle_elem);
7273
cl_raw_list_unlock(cl_com_handle_list);
7277
cl_raw_list_lock(handle->send_message_queue);
7278
while((mq_elem = cl_app_message_queue_get_first_elem(handle->send_message_queue)) != NULL) {
7279
mq_return_value = cl_commlib_send_message_to_endpoint(handle, mq_elem->snd_destination,
7280
mq_elem->snd_ack_type, mq_elem->snd_data,
7281
mq_elem->snd_size, mq_elem->snd_response_mid,
7283
/* remove queue entries */
7284
cl_raw_list_remove_elem(handle->send_message_queue, mq_elem->raw_elem);
7285
if (mq_return_value != CL_RETVAL_OK) {
7286
CL_LOG_STR(CL_LOG_ERROR,"can't send message:", cl_get_error_text(mq_return_value));
7287
free(mq_elem->snd_data);
7289
cl_com_free_endpoint(&(mq_elem->snd_destination));
7292
cl_raw_list_unlock(handle->send_message_queue);
7295
/* do write select */
7296
ret_val = cl_com_open_connection_request_handler(handle, handle->select_sec_timeout , handle->select_usec_timeout, CL_W_SELECT);
7298
case CL_RETVAL_SELECT_TIMEOUT:
7299
CL_LOG(CL_LOG_INFO,"write select timeout");
7300
wait_for_events = 0;
7302
case CL_RETVAL_SELECT_INTERRUPT:
7303
CL_LOG(CL_LOG_WARNING,"select interrupt");
7304
wait_for_events = 0;
7306
case CL_RETVAL_NO_SELECT_DESCRIPTORS:
7307
CL_LOG(CL_LOG_INFO,"no descriptors to select");
7308
wait_for_events = 1;
7311
wait_for_events = 1;
7312
cl_raw_list_lock(handle->connection_list);
7314
elem = cl_connection_list_get_first_elem(handle->connection_list);
7315
gettimeofday(&now,NULL);
7317
switch(elem->connection->connection_state) {
7319
/* trigger connect */
7320
if (elem->connection->fd_ready_for_write == CL_COM_DATA_READY &&
7321
elem->connection->data_write_flag == CL_COM_DATA_READY ) {
7322
return_value = cl_com_open_connection(elem->connection, handle->open_connection_timeout, NULL, NULL);
7323
if (return_value != CL_RETVAL_OK) {
7324
if (return_value != CL_RETVAL_UNCOMPLETE_WRITE ) {
7325
CL_LOG_STR(CL_LOG_ERROR,"could not open connection:",cl_get_error_text(return_value));
7326
elem->connection->connection_state = CL_CLOSING;
7327
elem->connection->connection_sub_state = CL_COM_DO_SHUTDOWN;
7328
} else if (cl_com_get_ignore_timeouts_flag() == CL_TRUE) {
7329
CL_LOG(CL_LOG_WARNING,"setting connection state to closing");
7330
elem->connection->connection_state = CL_CLOSING;
7331
elem->connection->connection_sub_state = CL_COM_DO_SHUTDOWN;
7335
/* check timeouts */
7336
if ( elem->connection->read_buffer_timeout_time != 0) {
7337
if ( now.tv_sec >= elem->connection->read_buffer_timeout_time || cl_com_get_ignore_timeouts_flag() == CL_TRUE) {
7338
CL_LOG(CL_LOG_ERROR,"read timeout for connection opening");
7339
elem->connection->connection_state = CL_CLOSING;
7340
elem->connection->connection_sub_state = CL_COM_DO_SHUTDOWN;
7343
if ( elem->connection->write_buffer_timeout_time != 0) {
7344
if ( now.tv_sec >= elem->connection->write_buffer_timeout_time || cl_com_get_ignore_timeouts_flag() == CL_TRUE) {
7345
CL_LOG(CL_LOG_ERROR,"write timeout for connection opening");
7346
elem->connection->connection_state = CL_CLOSING;
7347
elem->connection->connection_sub_state = CL_COM_DO_SHUTDOWN;
7354
case CL_ACCEPTING: {
7356
CL_LOG(CL_LOG_WARNING,"connection state is CL_ACCEPTING");
7358
if (elem->connection->fd_ready_for_write == CL_COM_DATA_READY &&
7359
elem->connection->data_write_flag == CL_COM_DATA_READY ) {
7360
return_value = cl_com_connection_complete_accept(elem->connection,handle->open_connection_timeout);
7361
if (return_value != CL_RETVAL_OK) {
7362
if (return_value != CL_RETVAL_UNCOMPLETE_READ &&
7363
return_value != CL_RETVAL_UNCOMPLETE_WRITE &&
7364
return_value != CL_RETVAL_SELECT_ERROR ) {
7365
CL_LOG_STR(CL_LOG_ERROR,"connection accept error:",cl_get_error_text(return_value));
7366
elem->connection->connection_state = CL_CLOSING;
7367
elem->connection->connection_sub_state = CL_COM_DO_SHUTDOWN;
7368
} else if (cl_com_get_ignore_timeouts_flag() == CL_TRUE) {
7369
elem->connection->connection_state = CL_CLOSING;
7370
elem->connection->connection_sub_state = CL_COM_DO_SHUTDOWN;
7373
elem->connection->connection_state = CL_CONNECTING;
7374
elem->connection->connection_sub_state = CL_COM_READ_INIT;
7375
elem->connection->data_read_flag = CL_COM_DATA_NOT_READY;
7378
/* check timeouts */
7379
if ( elem->connection->read_buffer_timeout_time != 0) {
7380
if ( now.tv_sec >= elem->connection->read_buffer_timeout_time ) {
7381
CL_LOG(CL_LOG_ERROR,"accept timeout for connection");
7382
elem->connection->connection_state = CL_CLOSING;
7383
elem->connection->connection_sub_state = CL_COM_DO_SHUTDOWN;
7386
if ( elem->connection->write_buffer_timeout_time != 0) {
7387
if ( now.tv_sec >= elem->connection->write_buffer_timeout_time ) {
7388
CL_LOG(CL_LOG_ERROR,"accept timeout for connection");
7389
elem->connection->connection_state = CL_CLOSING;
7390
elem->connection->connection_sub_state = CL_COM_DO_SHUTDOWN;
7397
case CL_CONNECTING: {
7398
if ( elem->connection->fd_ready_for_write == CL_COM_DATA_READY &&
7399
elem->connection->data_write_flag == CL_COM_DATA_READY ) {
7400
return_value = cl_com_connection_complete_request(handle->connection_list, elem, handle->open_connection_timeout, CL_W_SELECT);
7401
if (return_value != CL_RETVAL_OK) {
7402
if (return_value != CL_RETVAL_UNCOMPLETE_READ &&
7403
return_value != CL_RETVAL_UNCOMPLETE_WRITE &&
7404
return_value != CL_RETVAL_SELECT_ERROR ) {
7405
CL_LOG_STR(CL_LOG_ERROR,"connection establish error:",cl_get_error_text(return_value));
7406
elem->connection->connection_state = CL_CLOSING;
7407
elem->connection->connection_sub_state = CL_COM_DO_SHUTDOWN;
7408
} else if (cl_com_get_ignore_timeouts_flag() == CL_TRUE) {
7409
elem->connection->connection_state = CL_CLOSING;
7410
elem->connection->connection_sub_state = CL_COM_DO_SHUTDOWN;
7413
if (elem->connection->connection_state == CL_CONNECTED) {
7414
cl_commlib_finish_request_completeness(elem->connection);
7415
/* connection is now in connect state, do select before next reading */
7416
elem->connection->fd_ready_for_write = CL_COM_DATA_NOT_READY;
7419
/* check timeouts */
7420
if ( elem->connection->read_buffer_timeout_time != 0) {
7421
if ( now.tv_sec >= elem->connection->read_buffer_timeout_time || cl_com_get_ignore_timeouts_flag() == CL_TRUE ) {
7422
CL_LOG(CL_LOG_ERROR,"read timeout for connection completion");
7423
elem->connection->connection_state = CL_CLOSING;
7424
elem->connection->connection_sub_state = CL_COM_DO_SHUTDOWN;
7427
if ( elem->connection->write_buffer_timeout_time != 0) {
7428
if ( now.tv_sec >= elem->connection->write_buffer_timeout_time || cl_com_get_ignore_timeouts_flag() == CL_TRUE ) {
7429
CL_LOG(CL_LOG_ERROR,"write timeout for connection completion");
7430
elem->connection->connection_state = CL_CLOSING;
7431
elem->connection->connection_sub_state = CL_COM_DO_SHUTDOWN;
7438
case CL_CONNECTED: {
7439
if (elem->connection->fd_ready_for_write == CL_COM_DATA_READY &&
7440
elem->connection->data_write_flag == CL_COM_DATA_READY &&
7441
elem->connection->ccrm_sent == 0 ) {
7442
/* TODO implement thread pool for data writing */
7443
return_value = cl_commlib_handle_connection_write(elem->connection);
7444
if (return_value != CL_RETVAL_OK) {
7445
if (return_value != CL_RETVAL_UNCOMPLETE_WRITE &&
7446
return_value != CL_RETVAL_SELECT_ERROR ) {
7447
elem->connection->connection_state = CL_CLOSING;
7448
elem->connection->connection_sub_state = CL_COM_DO_SHUTDOWN;
7449
CL_LOG_STR(CL_LOG_ERROR,"write to connection: setting close flag! Reason:", cl_get_error_text(return_value));
7450
snprintf(tmp_string, 1024, MSG_CL_COMMLIB_CLOSING_SSU,
7451
elem->connection->remote->comp_host,
7452
elem->connection->remote->comp_name,
7453
sge_u32c(elem->connection->remote->comp_id));
7454
cl_commlib_push_application_error(CL_LOG_ERROR, return_value, tmp_string);
7455
} else if (cl_com_get_ignore_timeouts_flag() == CL_TRUE) {
7456
elem->connection->connection_state = CL_CLOSING;
7457
elem->connection->connection_sub_state = CL_COM_DO_SHUTDOWN;
7461
/* check timeouts */
7462
if ( elem->connection->write_buffer_timeout_time != 0) {
7463
if ( now.tv_sec >= elem->connection->write_buffer_timeout_time ) {
7464
CL_LOG(CL_LOG_ERROR,"write timeout for connected endpoint");
7465
snprintf(tmp_string, 1024, MSG_CL_COMMLIB_CLOSING_SSU,
7466
elem->connection->remote->comp_host,
7467
elem->connection->remote->comp_name,
7468
sge_u32c(elem->connection->remote->comp_id));
7469
cl_commlib_push_application_error(CL_LOG_ERROR, CL_RETVAL_SEND_TIMEOUT, tmp_string);
7470
elem->connection->connection_state = CL_CLOSING;
7471
elem->connection->connection_sub_state = CL_COM_DO_SHUTDOWN;
7480
} /* end of switch */
7481
if (elem->connection->ccm_received == 1 ) {
7482
if( cl_raw_list_get_elem_count(elem->connection->send_message_list) == 0 &&
7483
cl_raw_list_get_elem_count(elem->connection->received_message_list) == 0) {
7485
elem->connection->ccm_received = 2;
7486
elem->connection->connection_sub_state = CL_COM_SENDING_CCRM;
7487
cl_commlib_send_ccrm_message(elem->connection);
7488
CL_LOG(CL_LOG_INFO,"sending ccrm");
7490
CL_LOG_INT(CL_LOG_INFO,"receive buffer:",(int)cl_raw_list_get_elem_count(elem->connection->received_message_list) );
7491
CL_LOG_INT(CL_LOG_INFO,"send buffer :",(int)cl_raw_list_get_elem_count(elem->connection->send_message_list) );
7494
if (elem->connection->ccrm_sent != 0) {
7495
trigger_read_thread = 1;
7497
if (elem->connection->data_write_flag == CL_COM_DATA_READY) {
7498
/* still data to write, do not wait for events */
7499
wait_for_events = 0;
7501
elem = cl_connection_list_get_next_elem(elem);
7504
/* now take list entry and add it at least if there are more connections */
7505
/* this must be done in order to NOT prefer the first connection in list */
7506
if ( cl_raw_list_get_elem_count(handle->connection_list) > 1) {
7507
elem = cl_connection_list_get_first_elem(handle->connection_list);
7508
cl_raw_list_dechain_elem(handle->connection_list,elem->raw_elem);
7509
cl_raw_list_append_dechained_elem(handle->connection_list,elem->raw_elem);
7512
cl_raw_list_unlock(handle->connection_list);
7513
if (trigger_read_thread != 0) {
7514
#ifdef CL_DO_COMMLIB_DEBUG
7515
CL_LOG(CL_LOG_DEBUG,"triggering read thread");
7517
cl_thread_trigger_event(handle->read_thread);
7521
wait_for_events = 1;
7522
CL_LOG_STR(CL_LOG_ERROR,"got error:",cl_get_error_text(ret_val));
7526
if (wait_for_events != 0 ) {
7527
if ((ret_val = cl_thread_wait_for_event(thread_config, select_sec_timeout, select_usec_timeout )) != CL_RETVAL_OK) {
7529
case CL_RETVAL_CONDITION_WAIT_TIMEOUT:
7530
CL_LOG(CL_LOG_INFO,"WRITE THREAD GOT CONDITION WAIT TIMEOUT");
7533
CL_LOG_STR( CL_LOG_INFO, ">got error<: ", cl_get_error_text(ret_val));
7537
/* set event count to zero, because this thread will
7538
not wait when something is to do */
7539
cl_thread_clear_events(thread_config);
7542
CL_LOG(CL_LOG_INFO, "exiting ...");
7544
/* at least set exit state */
7545
cl_thread_func_cleanup(thread_config);
7551
#ifdef __CL_FUNCTION__
7552
#undef __CL_FUNCTION__
7554
#define __CL_FUNCTION__ "getuniquehostname()"
7555
/* MT-NOTE: getuniquehostname() is MT safe */
7556
int getuniquehostname(const char *hostin, char *hostout, int refresh_aliases) {
7557
char* resolved_host = NULL;
7560
if (refresh_aliases != 0) {
7561
/* TODO: refresh host alias file? But it's never used */
7562
CL_LOG(CL_LOG_ERROR,"getuniquehostname() refresh of alias file not implemented");
7564
ret_val = cl_com_cached_gethostbyname((char*)hostin, &resolved_host, NULL, NULL, NULL );
7565
if (resolved_host != NULL) {
7566
if (strlen(resolved_host) >= CL_MAXHOSTLEN ) {
7567
char tmp_buffer[1024];
7568
snprintf(tmp_buffer, 1024, MSG_CL_COMMLIB_HOSTNAME_EXEEDS_MAX_HOSTNAME_LENGTH_SU,
7569
resolved_host, sge_u32c(CL_MAXHOSTLEN));
7570
cl_commlib_push_application_error(CL_LOG_ERROR, CL_RETVAL_HOSTNAME_LENGTH_ERROR, tmp_buffer);
7571
free(resolved_host);
7572
return CL_RETVAL_HOSTNAME_LENGTH_ERROR;
7574
snprintf(hostout, CL_MAXHOSTLEN, "%s", resolved_host);
7575
free(resolved_host);
7581
#ifdef __CL_FUNCTION__
7582
#undef __CL_FUNCTION__
7584
#define __CL_FUNCTION__ "cl_commlib_app_message_queue_cleanup()"
7585
static void cl_commlib_app_message_queue_cleanup(cl_com_handle_t* handle) {
7587
do some connection cleanup
7588
- remove received messages not fetched by application
7590
pthread_mutex_lock(handle->messages_ready_mutex);
7591
if (handle->messages_ready_for_read != 0) {
7592
cl_app_message_queue_elem_t* app_mq_elem = NULL;
7594
long timeout_time = 0;
7595
cl_com_connection_t* connection = NULL;
7597
/* compute timeout */
7598
gettimeofday(&now, NULL);
7600
cl_raw_list_lock(handle->received_message_queue);
7601
app_mq_elem = cl_app_message_queue_get_first_elem(handle->received_message_queue);
7602
while(app_mq_elem) {
7603
cl_message_list_elem_t* message_elem = NULL;
7604
cl_message_list_elem_t* next_message_elem = NULL;
7606
connection = app_mq_elem->rcv_connection;
7607
app_mq_elem = cl_app_message_queue_get_next_elem(app_mq_elem);
7609
cl_raw_list_lock(connection->received_message_list);
7610
next_message_elem = cl_message_list_get_first_elem(connection->received_message_list);
7611
while(next_message_elem) {
7612
cl_com_message_t* message = NULL;
7613
message_elem = next_message_elem;
7614
next_message_elem = cl_message_list_get_next_elem(message_elem);
7615
message = message_elem->message;
7616
if (message != NULL && message->message_state == CL_MS_READY) {
7617
timeout_time = message->message_receive_time.tv_sec + handle->message_timeout;
7618
if (timeout_time <= now.tv_sec) {
7619
CL_LOG(CL_LOG_WARNING,"removing message because of message_timeout");
7621
cl_message_list_remove_receive(connection, message, 0);
7622
handle->messages_ready_for_read = handle->messages_ready_for_read - 1;
7623
cl_app_message_queue_remove(handle->received_message_queue, connection, 0, CL_FALSE);
7624
cl_com_free_message(&message);
7628
cl_raw_list_unlock(connection->received_message_list);
7630
cl_raw_list_unlock(handle->received_message_queue);
7632
pthread_mutex_unlock(handle->messages_ready_mutex);
7635
/****** cl_commlib/cl_com_messages_in_send_queue() ****************************
7637
* cl_com_messages_in_send_queue() -- Returns the number of messages in the
7638
* send queue of the communication library
7641
* unsigned long cl_com_messages_in_send_queue(cl_com_handle_t *handle)
7644
* Returns the number of messages in the send queue of the commlib
7645
* library, i.e. the messages that were placed into the send queue
7646
* using the cl_commlib_send_message() function but were not
7650
* cl_com_handle_t *handle - Handle of the commlib instance.
7653
* unsigned long - Number of messages in send queue.
7656
* MT-NOTE: cl_com_messages_in_send_queue() is MT safe
7659
* cl_commlib/cl_commlib_send_message
7660
*******************************************************************************/
7661
unsigned long cl_com_messages_in_send_queue(cl_com_handle_t *handle)
7663
cl_connection_list_elem_t *con_elem = NULL;
7664
unsigned long elems = 0;
7666
if (handle != NULL) {
7667
if (handle->connection_list != NULL) {
7668
cl_raw_list_lock(handle->connection_list);
7669
con_elem = cl_connection_list_get_first_elem(handle->connection_list);
7671
if (con_elem != NULL) {
7672
cl_raw_list_lock(con_elem->connection->send_message_list);
7673
elems = cl_raw_list_get_elem_count(con_elem->connection->send_message_list);
7674
cl_raw_list_unlock(con_elem->connection->send_message_list);
7676
cl_raw_list_unlock(handle->connection_list);