33
38
msg = log_msg_new(msg_str, strlen(msg_str), g_sockaddr_inet_new("10.10.10.10", 1010), &parse_options);
34
39
log_msg_add_ack(msg, &path_options);
35
40
msg->ack_func = test_ack;
36
if (!log_queue_push_tail((*q), msg, &path_options))
38
fprintf(stderr, "Queue unable to consume enough messages: %d\n", fed_messages);
41
log_queue_push_tail((*q), msg, &path_options);
122
/* no synchronization between the feed/consume threads, therefore it does
123
* not succeed reliably. commented out for now, will fix at the next
124
* logqueue related threaded issue */
126
GStaticMutex threaded_lock = G_STATIC_MUTEX_INIT;
122
#define MESSAGES_PER_FEEDER 50000
123
#define MESSAGES_SUM (FEEDERS * MESSAGES_PER_FEEDER)
128
struct list_head finish_callbacks;
132
#define finish_callbacks __tls_deref(finish_callbacks)
135
main_loop_io_worker_register_finish_callback(MainLoopIOWorkerFinishCallback *cb)
137
list_add(&cb->list, &finish_callbacks);
141
main_loop_io_worker_invoke_finish_callbacks(void)
143
struct list_head *lh, *lh2;
145
list_for_each_safe(lh, lh2, &finish_callbacks)
147
MainLoopIOWorkerFinishCallback *cb = list_entry(lh, MainLoopIOWorkerFinishCallback, list);
149
cb->func(cb->user_data);
150
list_del_init(&cb->list);
129
threaded_feed(gpointer st)
158
threaded_feed(gpointer args)
131
LogQueue *q = (LogQueue *) st;
160
LogQueue *q = (LogQueue *) ((gpointer *) args)[0];
161
gint id = GPOINTER_TO_INT(((gpointer *) args)[1]);
132
162
char *msg_str = "<155>2006-02-11T10:34:56+01:00 bzorp syslog-ng[23323]: árvíztűrőtükörfúrógép";
163
gint msg_len = strlen(msg_str);
134
165
LogPathOptions path_options = LOG_PATH_OPTIONS_INIT;
137
for (i = 0; i < 100000; i++)
166
LogMessage *msg, *tmpl;
173
/* emulate main loop for LogQueue */
174
main_loop_io_worker_set_thread_id(id);
175
INIT_LIST_HEAD(&finish_callbacks);
177
sa = g_sockaddr_inet_new("10.10.10.10", 1010);
178
tmpl = log_msg_new(msg_str, msg_len, g_sockaddr_ref(sa), &parse_options);
179
g_get_current_time(&start);
180
for (i = 0; i < MESSAGES_PER_FEEDER; i++)
139
msg = log_msg_new(msg_str, strlen(msg_str), g_sockaddr_inet_new("10.10.10.10", 1010), &parse_options);
182
msg = log_msg_clone_cow(tmpl, &path_options);
140
183
log_msg_add_ack(msg, &path_options);
141
184
msg->ack_func = test_ack;
143
g_static_mutex_lock(&threaded_lock);
144
if (!log_queue_push_tail(q, msg, &path_options))
146
fprintf(stderr, "Queue unable to consume enough messages: %d\n", fed_messages);
147
return GUINT_TO_POINTER(1);
149
g_static_mutex_unlock(&threaded_lock);
186
log_queue_push_tail(q, msg, &path_options);
189
main_loop_io_worker_invoke_finish_callbacks();
191
main_loop_io_worker_invoke_finish_callbacks();
192
g_get_current_time(&end);
193
diff = g_time_val_diff(&end, &start);
194
g_static_mutex_lock(&tlock);
196
g_static_mutex_unlock(&tlock);
160
207
gboolean success;
163
for (i = 0; i < 100000; i++)
210
/* just to make sure time is properly cached */
213
for (i = 0; i < MESSAGES_SUM; i++)
165
g_static_mutex_lock(&threaded_lock);
167
success = log_queue_pop_head(q, &msg, &path_options, FALSE);
168
g_static_mutex_unlock(&threaded_lock);
220
success = log_queue_pop_head(q, &msg, &path_options, FALSE, FALSE);
227
ns.tv_nsec = 1000000;
228
nanosleep(&ns, NULL);
232
/* slept for more than 10 seconds */
233
fprintf(stderr, "The wait for messages took too much time, i=%d\n", i);
234
return GUINT_TO_POINTER(1);
170
240
g_assert(!success || (success && msg != NULL));
185
256
testcase_with_threads()
188
GThread *thread_feed, *thread_consume;
259
GThread *thread_feed[FEEDERS], *thread_consume;
260
gpointer args[FEEDERS][2];
191
for (i = 0; i < 100; i++)
263
log_queue_set_max_threads(FEEDERS);
264
for (i = 0; i < TEST_RUNS; i++)
193
q = log_queue_new(100000, 0, 64);
194
thread_feed = g_thread_create(threaded_feed, q, TRUE, NULL);
266
fprintf(stderr,"starting testrun: %d\n",i);
267
q = log_queue_fifo_new(MESSAGES_SUM, NULL);
269
for (j = 0; j < FEEDERS; j++)
272
args[j][1] = GINT_TO_POINTER(j);
273
fprintf(stderr,"starting feed thread %d\n",j);
274
thread_feed[j] = g_thread_create(threaded_feed, args[j], TRUE, NULL);
196
277
thread_consume = g_thread_create(threaded_consume, q, TRUE, NULL);
197
g_thread_join(thread_feed);
279
for (j = 0; j < FEEDERS; j++)
281
fprintf(stderr,"waiting for feed thread %d\n",j);
282
g_thread_join(thread_feed[j]);
198
284
g_thread_join(thread_consume);
288
fprintf(stderr, "Feed speed: %.2lf\n", (double) TEST_RUNS * MESSAGES_SUM * 1000000 / sum_time);
295
fprintf(stderr,"On AIX this testcase can't executed, because the overriding of main_loop_io_worker_register_finish_callback does not work\n");
209
299
putenv("TZ=MET-1METDST");
214
304
msg_format_options_defaults(&parse_options);
215
305
msg_format_options_init(&parse_options, configuration);
307
fprintf(stderr,"Start testcase_with_threads\n");
308
testcase_with_threads();
311
fprintf(stderr,"Start testcase_zero_diskbuf_alternating_send_acks\n");
217
312
testcase_zero_diskbuf_alternating_send_acks();
313
fprintf(stderr,"Start testcase_zero_diskbuf_and_normal_acks\n");
218
314
testcase_zero_diskbuf_and_normal_acks();