~ubuntu-branches/ubuntu/trusty/syslog-ng/trusty-proposed

« back to all changes in this revision

Viewing changes to tests/unit/test_logqueue.c

  • Committer: Package Import Robot
  • Author(s): Laszlo Boszormenyi (GCS), Gergely Nagy
  • Date: 2011-10-11 14:30:48 UTC
  • mfrom: (1.3.7)
  • Revision ID: package-import@ubuntu.com-20111011143048-r1iljux9xbvj3lwh
Tags: 3.3.1.dfsg-1
* New upstream release with important fixes from upstream git tree with
  non-free manpages removed.
* Drop syslog-ng.conf(5) (closes: #496521).
* syslog-ng(8) is generated, and does not mention -Q anymore
  (closes: #616069).
* Supports CAP_SYSLOG on recent kernels (closes: #630172).
* Does not use g_timeout_add_seconds anymore (closes: #609154).

[ Gergely Nagy <algernon@madhouse-project.org> ]
* Update debian/copyright to DEP-5 format.
* Simplified the logrotate file by merging identical entries.
* Include local configuration files from /etc/syslog-ng/conf.d/ (Closes:
  #609050).
* Update syslog-ng.conf to be fully 3.3 compliant.
* Compress both source and binaries (except the syslog-ng meta
  package) with xz, instead of gzip.
* Use dpkg triggers to restart syslog-ng when appropriate.
* Include DFSG-free manual pages for all binaries.
* Build with Hardening enabled.
* Mention syslog(3) in /etc/default/syslog-ng, instead of
  <linux/kernel.h> (Closes: #608605)
* Support 'status' in the init script.
  Patch from Peter Eisentraut <petere@debian.org> (Closes: #644458)
* Build-Depend on libevtlog-dev (>= 0.2.12-5~) for correct shlibs.
* Use [linux-any] in Build-Depends instead of hardcoded links.
  (Closes: #634715)
* Use $SYSLOGNG_OPTS in the init script when reloading syslog-ng.
  (Closes: #589081)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
#include "logqueue.h"
 
2
#include "logqueue-fifo.h"
2
3
#include "logpipe.h"
3
4
#include "apphook.h"
4
5
#include "plugin.h"
 
6
#include "mainloop.h"
 
7
#include "tls-support.h"
5
8
 
6
9
#include <stdlib.h>
7
10
#include <string.h>
 
11
#include <iv.h>
 
12
#include <iv_thread.h>
8
13
 
9
14
int acked_messages = 0;
10
15
int fed_messages = 0;
19
24
}
20
25
 
21
26
void
22
 
feed_some_messages(LogQueue **q, int n, gboolean flow_control)
 
27
feed_some_messages(LogQueue **q, int n, gboolean ack_needed)
23
28
{
24
29
  LogPathOptions path_options = LOG_PATH_OPTIONS_INIT;
25
30
  LogMessage *msg;
26
31
  gint i;
27
32
 
28
 
  path_options.flow_control = flow_control;
 
33
  path_options.ack_needed = ack_needed;
29
34
  for (i = 0; i < n; i++)
30
35
    {
31
36
      char *msg_str = "<155>2006-02-11T10:34:56+01:00 bzorp syslog-ng[23323]: árvíztűrőtükörfúrógép";
33
38
      msg = log_msg_new(msg_str, strlen(msg_str), g_sockaddr_inet_new("10.10.10.10", 1010), &parse_options);
34
39
      log_msg_add_ack(msg, &path_options);
35
40
      msg->ack_func = test_ack;
36
 
      if (!log_queue_push_tail((*q), msg, &path_options))
37
 
        {
38
 
          fprintf(stderr, "Queue unable to consume enough messages: %d\n", fed_messages);
39
 
          exit(1);
40
 
        }
 
41
      log_queue_push_tail((*q), msg, &path_options);
41
42
      fed_messages++;
42
43
    }
43
44
 
52
53
 
53
54
  for (i = 0; i < n; i++)
54
55
    {
55
 
      log_queue_pop_head(q, &msg, &path_options, use_app_acks);
 
56
      log_queue_pop_head(q, &msg, &path_options, use_app_acks, FALSE);
56
57
      log_msg_ack(msg, &path_options);
57
58
      log_msg_unref(msg);
58
59
    }
76
77
  LogQueue *q;
77
78
  gint i;
78
79
 
79
 
  q = log_queue_new(OVERFLOW_SIZE);
 
80
  q = log_queue_fifo_new(OVERFLOW_SIZE, NULL);
80
81
  fed_messages = 0;
81
82
  acked_messages = 0;
82
83
  for (i = 0; i < 10; i++)
90
91
      exit(1);
91
92
    }
92
93
 
93
 
  log_queue_free(q);
 
94
  log_queue_unref(q);
94
95
}
95
96
 
96
97
void
99
100
  LogQueue *q;
100
101
  gint i;
101
102
 
102
 
  q = log_queue_new(OVERFLOW_SIZE);
 
103
  q = log_queue_fifo_new(OVERFLOW_SIZE, NULL);
103
104
  fed_messages = 0;
104
105
  acked_messages = 0;
105
106
  for (i = 0; i < 10; i++)
114
115
      exit(1);
115
116
    }
116
117
 
117
 
  log_queue_free(q);
118
 
}
119
 
 
120
 
#if 0
121
 
 
122
 
/* no synchronization between the feed/consume threads, therefore it does
123
 
 * not succeed reliably. commented out for now, will fix at the next
124
 
 * logqueue related threaded issue */
125
 
 
126
 
GStaticMutex threaded_lock = G_STATIC_MUTEX_INIT;
 
118
  log_queue_unref(q);
 
119
}
 
120
 
 
121
#define FEEDERS 1
 
122
#define MESSAGES_PER_FEEDER 50000
 
123
#define MESSAGES_SUM (FEEDERS * MESSAGES_PER_FEEDER)
 
124
#define TEST_RUNS 10
 
125
 
 
126
TLS_BLOCK_START
 
127
{
 
128
  struct list_head finish_callbacks;
 
129
}
 
130
TLS_BLOCK_END;
 
131
 
 
132
#define finish_callbacks  __tls_deref(finish_callbacks)
 
133
 
 
134
void
 
135
main_loop_io_worker_register_finish_callback(MainLoopIOWorkerFinishCallback *cb)
 
136
{
 
137
  list_add(&cb->list, &finish_callbacks);
 
138
}
 
139
 
 
140
void
 
141
main_loop_io_worker_invoke_finish_callbacks(void)
 
142
{
 
143
  struct list_head *lh, *lh2;
 
144
 
 
145
  list_for_each_safe(lh, lh2, &finish_callbacks)
 
146
    {
 
147
      MainLoopIOWorkerFinishCallback *cb = list_entry(lh, MainLoopIOWorkerFinishCallback, list);
 
148
                            
 
149
      cb->func(cb->user_data);
 
150
      list_del_init(&cb->list);
 
151
    }
 
152
}
 
153
 
 
154
GStaticMutex tlock;
 
155
glong sum_time;
127
156
 
128
157
gpointer
129
 
threaded_feed(gpointer st)
 
158
threaded_feed(gpointer args)
130
159
{
131
 
  LogQueue *q = (LogQueue *) st;
 
160
  LogQueue *q = (LogQueue *) ((gpointer *) args)[0];
 
161
  gint id = GPOINTER_TO_INT(((gpointer *) args)[1]);
132
162
  char *msg_str = "<155>2006-02-11T10:34:56+01:00 bzorp syslog-ng[23323]: árvíztűrőtükörfúrógép";
 
163
  gint msg_len = strlen(msg_str);
133
164
  gint i;
134
165
  LogPathOptions path_options = LOG_PATH_OPTIONS_INIT;
135
 
  LogMessage *msg;
136
 
 
137
 
  for (i = 0; i < 100000; i++)
 
166
  LogMessage *msg, *tmpl;
 
167
  GTimeVal start, end;
 
168
  GSockAddr *sa;
 
169
  glong diff;
 
170
 
 
171
  iv_init();
 
172
  
 
173
  /* emulate main loop for LogQueue */
 
174
  main_loop_io_worker_set_thread_id(id);
 
175
  INIT_LIST_HEAD(&finish_callbacks);
 
176
 
 
177
  sa = g_sockaddr_inet_new("10.10.10.10", 1010);
 
178
  tmpl = log_msg_new(msg_str, msg_len, g_sockaddr_ref(sa), &parse_options);
 
179
  g_get_current_time(&start);
 
180
  for (i = 0; i < MESSAGES_PER_FEEDER; i++)
138
181
    {
139
 
      msg = log_msg_new(msg_str, strlen(msg_str), g_sockaddr_inet_new("10.10.10.10", 1010), &parse_options);
 
182
      msg = log_msg_clone_cow(tmpl, &path_options);
140
183
      log_msg_add_ack(msg, &path_options);
141
184
      msg->ack_func = test_ack;
142
185
 
143
 
      g_static_mutex_lock(&threaded_lock);
144
 
      if (!log_queue_push_tail(q, msg, &path_options))
145
 
        {
146
 
          fprintf(stderr, "Queue unable to consume enough messages: %d\n", fed_messages);
147
 
          return GUINT_TO_POINTER(1);
148
 
        }
149
 
      g_static_mutex_unlock(&threaded_lock);
 
186
      log_queue_push_tail(q, msg, &path_options);
 
187
      
 
188
      if ((i & 0xFF) == 0)
 
189
        main_loop_io_worker_invoke_finish_callbacks();
150
190
    }
 
191
  main_loop_io_worker_invoke_finish_callbacks();
 
192
  g_get_current_time(&end);
 
193
  diff = g_time_val_diff(&end, &start);
 
194
  g_static_mutex_lock(&tlock);
 
195
  sum_time += diff;
 
196
  g_static_mutex_unlock(&tlock);
 
197
  log_msg_unref(tmpl);
151
198
  return NULL;
152
199
}
153
200
 
160
207
  gboolean success;
161
208
  gint i;
162
209
 
163
 
  for (i = 0; i < 100000; i++)
 
210
  /* just to make sure time is properly cached */
 
211
  iv_init();
 
212
 
 
213
  for (i = 0; i < MESSAGES_SUM; i++)
164
214
    {
165
 
      g_static_mutex_lock(&threaded_lock);
 
215
      gint slept = 0;
166
216
      msg = NULL;
167
 
      success = log_queue_pop_head(q, &msg, &path_options, FALSE);
168
 
      g_static_mutex_unlock(&threaded_lock);
 
217
 
 
218
      do
 
219
        {
 
220
          success = log_queue_pop_head(q, &msg, &path_options, FALSE, FALSE);
 
221
          if (!success)
 
222
            {
 
223
              struct timespec ns;
 
224
 
 
225
              /* sleep 1 msec */
 
226
              ns.tv_sec = 0;
 
227
              ns.tv_nsec = 1000000;
 
228
              nanosleep(&ns, NULL);
 
229
              slept++;
 
230
              if (slept > 10000)
 
231
                {
 
232
                  /* slept for more than 10 seconds */
 
233
                  fprintf(stderr, "The wait for messages took too much time, i=%d\n", i);
 
234
                  return GUINT_TO_POINTER(1);
 
235
                }
 
236
            }
 
237
        }
 
238
      while (!success);
169
239
 
170
240
      g_assert(!success || (success && msg != NULL));
171
241
      if (!success)
181
251
  return NULL;
182
252
}
183
253
 
 
254
 
184
255
void
185
256
testcase_with_threads()
186
257
{
187
258
  LogQueue *q;
188
 
  GThread *thread_feed, *thread_consume;
189
 
  gint i;
 
259
  GThread *thread_feed[FEEDERS], *thread_consume;
 
260
  gpointer args[FEEDERS][2];
 
261
  gint i, j;
190
262
 
191
 
  for (i = 0; i < 100; i++)
 
263
  log_queue_set_max_threads(FEEDERS);
 
264
  for (i = 0; i < TEST_RUNS; i++)
192
265
    {
193
 
      q = log_queue_new(100000, 0, 64);
194
 
      thread_feed = g_thread_create(threaded_feed, q, TRUE, NULL);
 
266
      fprintf(stderr,"starting testrun: %d\n",i);
 
267
      q = log_queue_fifo_new(MESSAGES_SUM, NULL);
 
268
 
 
269
      for (j = 0; j < FEEDERS; j++)
 
270
        {
 
271
          args[j][0] = q;
 
272
          args[j][1] = GINT_TO_POINTER(j);
 
273
          fprintf(stderr,"starting feed thread %d\n",j);
 
274
          thread_feed[j] = g_thread_create(threaded_feed, args[j], TRUE, NULL);
 
275
        }
195
276
 
196
277
      thread_consume = g_thread_create(threaded_consume, q, TRUE, NULL);
197
 
      g_thread_join(thread_feed);
 
278
 
 
279
      for (j = 0; j < FEEDERS; j++)
 
280
      {
 
281
        fprintf(stderr,"waiting for feed thread %d\n",j);
 
282
        g_thread_join(thread_feed[j]);
 
283
      }
198
284
      g_thread_join(thread_consume);
199
285
 
200
 
      log_queue_free(q);
 
286
      log_queue_unref(q);
201
287
    }
 
288
  fprintf(stderr, "Feed speed: %.2lf\n", (double) TEST_RUNS * MESSAGES_SUM * 1000000 / sum_time);
202
289
}
203
 
#endif
204
290
 
205
291
int
206
292
main()
207
293
{
 
294
#if _AIX
 
295
  fprintf(stderr,"On AIX this testcase can't executed, because the overriding of main_loop_io_worker_register_finish_callback does not work\n");
 
296
  return 0;
 
297
#endif
208
298
  app_startup();
209
299
  putenv("TZ=MET-1METDST");
210
300
  tzset();
214
304
  msg_format_options_defaults(&parse_options);
215
305
  msg_format_options_init(&parse_options, configuration);
216
306
 
 
307
  fprintf(stderr,"Start testcase_with_threads\n");
 
308
  testcase_with_threads();
 
309
 
 
310
#if 1
 
311
  fprintf(stderr,"Start testcase_zero_diskbuf_alternating_send_acks\n");
217
312
  testcase_zero_diskbuf_alternating_send_acks();
 
313
  fprintf(stderr,"Start testcase_zero_diskbuf_and_normal_acks\n");
218
314
  testcase_zero_diskbuf_and_normal_acks();
 
315
#endif
219
316
  return 0;
220
317
}