~ubuntu-branches/ubuntu/trusty/rsyslog/trusty

« back to all changes in this revision

Viewing changes to runtime/queue.h

  • Committer: Dave Walker (Daviey)
  • Author(s): Scott Moser
  • Date: 2011-06-17 20:59:38 UTC
  • mfrom: (36.1.8 oneiric.merge)
  • Revision ID: davewalker@ubuntu.com-20110617205938-pfkizxz2wsgzi2ot
Tags: 5.8.1-1ubuntu1
* Resynchronise with Debian (LP: #794230).  Remaining changes:
  - Run as rsyslog:rsyslog, set $FileOwner to syslog
  - Replace init script with debian/rsyslog.upstart.
  - debian/rsyslog.logrotate: Use reload command to restart rsyslog
  - debian/rsyslog.conf: enable $RepeatedMsgReduction 
    to avoid bloating the syslog file (LP #453444)
  - Add debian/rsyslog.dmesg.upstart to save initial dmesg into a file.
    Install it in debian/rules.
  - debian/50-default.conf: set of default rules for syslog (forwarded to
    Debian #603160). remove file in postrm on purge. manage with ucf.
  - debian/rules: build with LDFLAGS=""
* Dropped:
  - debian/control: Bump build-dependency on debhelper
    debian now depends on dh >= 8
* New upstream release.
* Bump Standards-Version to 3.9.2. No further changes.
* Enable and install impstats module. (Closes: #620114)
* Update logcheck rule. (Closes: #616659)
* debian/rsyslog.init: Set correct compat level (5).
* The way rsyslog processes SIGHUP has changed. It no longer does a reload
  of its configuration, but simply closes all open files. To apply a changed
  configuration, rsyslogd needs to be restarted now.
  - Drop "reload" action from debian/rsyslog.init, map "force-reload" to
    "restart". (Closes: #580897)
  - Add "rotate" action to debian/rsyslog.init which sends SIGHUP to
    rsyslogd. Use that in debian/rsyslog.logrotate. (Closes: #626365)
  - Update debian/rsyslog-mysql.postinst and rsyslog-pgsql.postinst to use
    restart instead of reload.
  - Add a NEWS file explaining the changed SIGHUP handling.
* New upstream stable release.
* New upstream release.
  - Properly handle ANSI SQL strings in ompgsql. (Closes: #600479)
* New upstream release.
* debian/patches/02-pmaixforwardedfrom_type_nokeep.patch
  - Remove, merged upstream.
* debian/patches/03-epoll_create1-fallback.patch
  - Remove, merged upstream.
* debian/patches/03-epoll_create1-fallback.patch
  - If epoll_create1() is not available during runtime, fall back to
    epoll_create(). This fixes remote syslog when runnig rsyslog on a
    lenny kernel. (Closes: #617996)
* New upstream release.
* debian/rsyslog.links
  - Create symlink for rsyslog.service in multi-user.target.wants so rsyslog
    is enabled by default when using systemd.
* debian/patches/02-pmaixforwardedfrom_type_nokeep.patch
  - Fix build failure in aixforwardedfrom parser module by setting the
    module type to NOKEEP.
* debian/rsyslog.preinst
  - Remove old rsyslog.socket symlink from sockets.target.wants on upgrades
    as rsyslog uses syslog.socket now which is provided by systemd.
* debian/rsyslog.install
  - Stop installing rsyslog.socket.
* New upstream release.
* New upstream release.
  - Fix regression in imuxsock plugin which did no longer sanitize received
    messages. This makes 02-cleanup-trailing-lf.patch obsolete and also
    fixes the SQL syntax errors in the mysql output if the input contained
    NUL bytes. Closes: #614061
* Enable and install omprog output plugin. Closes: #552095
* Improve package description. Closes: #612948
  Thanks to Justin B Rye for the patch.
* debian/patches/02-cleanup-trailing-lf.patch
  - Fix regression in imuxsock plugin which did not remove a trailing LF
    anymore. Patch cherry-picked from upstream Git. Closes: #612829
* New upstream release.
* Enable and install parser modules.
* New upstream release.
* Upload to unstable.
* debian/patches/02-typo_fix_equation_sign.patch
  - Removed, merged upstream.
* debian/patches/03-atomic_operations.patch
  - Removed, merged upstream.
* debian/patches/03-atomic_operations.patch
  - Fix build failures on platforms which don't have 64 bit atomic
    operations. Patch cherry-picked from upstream Git. Closes: #600930
* New upstream development release.
* Remove patches, merged upstream
  - debian/patches/02-install_also_rsyslog_socket.patch
  - debian/patches/02-tls_loop_fix.patch
* debian/patches/02-typo_fix_equation_sign.patch
  - Fix small typo ("equation sign"). Closes: #575589
* debian/rsyslog.postinst
  - Remove pre-lenny migration code to rotate old log files from sysklogd.
* New upstream development release.
* debian/rsyslog.install
  - Install omruleset.so plugin: http://www.rsyslog.com/doc/omruleset.html
* debian/rsyslog.default
  - Start rsyslogd with native -c5 mode.
* Install systemd unit files which allow to run rsyslog in socket activation
  mode when systemd is used.
* debian/patches/02-install_also_rsyslog_socket.patch
  - When enabling rsyslog.service also enable rsyslog.socket. Patch
    cherry-picked from upstream Git.
* Bump debhelper compatibility level to 8. Update Build-Depends accordingly.

Show diffs side-by-side

added added

removed removed

Lines of Context:
27
27
#include <pthread.h>
28
28
#include "obj.h"
29
29
#include "wtp.h"
 
30
#include "batch.h"
30
31
#include "stream.h"
 
32
#include "statsobj.h"
 
33
 
 
34
/* support for the toDelete list */
 
35
typedef struct toDeleteLst_s toDeleteLst_t;
 
36
struct toDeleteLst_s {
 
37
        qDeqID  deqID;
 
38
        int     nElemDeq;       /* numbe of elements that were dequeued and as such must now be discarded */
 
39
        struct toDeleteLst_s *pNext;
 
40
};
 
41
 
31
42
 
32
43
/* queue types */
33
44
typedef enum {
44
55
} qLinkedList_t;
45
56
 
46
57
 
47
 
typedef struct qWrkThrd_s {
48
 
        pthread_t thrdID;  /* thread ID */
49
 
        qWrkCmd_t tCurrCmd; /* current command to be carried out by worker */
50
 
        obj_t *pUsr;        /* current user object being processed (or NULL if none) */
51
 
        struct queue_s *pQueue; /* my queue (important if only the work thread instance is passed! */
52
 
        int iThrd;      /* my worker thread array index */
53
 
        pthread_cond_t condInitDone; /* signaled when the thread startup is done (once per thread existance) */
54
 
        pthread_mutex_t mut;
55
 
} qWrkThrd_t;   /* type for queue worker threads */
56
 
 
57
58
/* the queue object */
58
 
typedef struct queue_s {
 
59
struct queue_s {
59
60
        BEGINobjInstance;
60
61
        queueType_t     qType;
61
 
        bool    bEnqOnly;       /* does queue run in enqueue-only mode (1) or not (0)? */
62
 
        bool    bSaveOnShutdown;/* persists everthing on shutdown (if DA!)? 1-yes, 0-no */
63
 
        bool    bQueueStarted;  /* has queueStart() been called on this queue? 1-yes, 0-no */
64
 
        bool    bQueueInDestruction;/* 1 if queue is in destruction process, 0 otherwise */
 
62
        int     nLogDeq;        /* number of elements currently logically dequeued */
 
63
        int     bShutdownImmediate; /* should all workers cease processing messages? */
 
64
        sbool   bEnqOnly;       /* does queue run in enqueue-only mode (1) or not (0)? */
 
65
        sbool   bSaveOnShutdown;/* persists everthing on shutdown (if DA!)? 1-yes, 0-no */
 
66
        sbool   bQueueStarted;  /* has queueStart() been called on this queue? 1-yes, 0-no */
65
67
        int     iQueueSize;     /* Current number of elements in the queue */
66
68
        int     iMaxQueueSize;  /* how large can the queue grow? */
67
69
        int     iNumWorkerThreads;/* number of worker threads to use */
72
74
        void    *pUsr;          /* a global, user-supplied pointer. Is passed back to consumer. */
73
75
        int     iUpdsSincePersist;/* nbr of queue updates since the last persist call */
74
76
        int     iPersistUpdCnt; /* persits queue info after this nbr of updates - 0 -> persist only on shutdown */
75
 
        bool    bSyncQueueFiles;/* if working with files, sync them after each write? */
 
77
        sbool   bSyncQueueFiles;/* if working with files, sync them after each write? */
76
78
        int     iHighWtrMrk;    /* high water mark for disk-assisted memory queues */
77
79
        int     iLowWtrMrk;     /* low water mark for disk-assisted memory queues */
78
80
        int     iDiscardMrk;    /* if the queue is above this mark, low-severity messages are discarded */
79
81
        int     iFullDlyMrk;    /* if the queue is above this mark, FULL_DELAYable message are put on hold */
80
82
        int     iLightDlyMrk;   /* if the queue is above this mark, LIGHT_DELAYable message are put on hold */
81
83
        int     iDiscardSeverity;/* messages of this severity above are discarded on too-full queue */
82
 
        bool    bNeedDelQIF;    /* does the QIF file need to be deleted when queue becomes empty? */
 
84
        sbool   bNeedDelQIF;    /* does the QIF file need to be deleted when queue becomes empty? */
83
85
        int     toQShutdown;    /* timeout for regular queue shutdown in ms */
84
86
        int     toActShutdown;  /* timeout for long-running action shutdown in ms */
85
87
        int     toWrkShutdown;  /* timeout for idle workers in ms, -1 means indefinite (0 is immediate) */
 
88
        toDeleteLst_t *toDeleteLst;/* this queue's to-delete list */
86
89
        int     toEnq;          /* enqueue timeout */
 
90
        int     iDeqBatchSize;  /* max number of elements that shall be dequeued at once */
87
91
        /* rate limiting settings (will be expanded) */
88
92
        int     iDeqSlowdown; /* slow down dequeue by specified nbr of microseconds */
89
93
        /* end rate limiting */
97
101
         * applied to detect user configuration errors (and tell me how should we detect what
98
102
         * the user really wanted...). -- rgerhards, 2008-04-02
99
103
         */
100
 
        /* ane dequeue time window */
101
 
        rsRetVal (*pConsumer)(void *,void*); /* user-supplied consumer function for dequeued messages */
 
104
        /* end dequeue time window */
 
105
        rsRetVal (*pConsumer)(void *,batch_t*,int*); /* user-supplied consumer function for dequeued messages */
102
106
        /* calling interface for pConsumer: arg1 is the global user pointer from this structure, arg2 is the
103
 
         * user pointer that was dequeued (actual sample: for actions, arg1 is the pAction and arg2 is pointer
104
 
         * to message)
105
 
         * rgerhards, 2008-01-28
 
107
         * user pointer array that was dequeued (actual sample: for actions, arg1 is the pAction and arg2
 
108
         * is pointer to an array of message message pointers), arg3 is a pointer to an interger which is zero
 
109
         * during normal operations and one if the consumer must urgently shut down.
106
110
         */
107
111
        /* type-specific handlers (set during construction) */
108
112
        rsRetVal (*qConstruct)(struct queue_s *pThis);
109
113
        rsRetVal (*qDestruct)(struct queue_s *pThis);
110
114
        rsRetVal (*qAdd)(struct queue_s *pThis, void *pUsr);
111
 
        rsRetVal (*qDel)(struct queue_s *pThis, void **ppUsr);
 
115
        rsRetVal (*qDeq)(struct queue_s *pThis, void **ppUsr);
 
116
        rsRetVal (*qDel)(struct queue_s *pThis);
112
117
        /* end type-specific handler */
 
118
        /* public entry points (set during construction, permit to set best algorithm for params selected) */
 
119
        rsRetVal (*MultiEnq)(qqueue_t *pThis, multi_submit_t *pMultiSub);
 
120
        /* end public entry points */
113
121
        /* synchronization variables */
114
122
        pthread_mutex_t mutThrdMgmt; /* mutex for the queue's thread management */
115
123
        pthread_mutex_t *mut; /* mutex for enqueing and dequeueing messages */
116
124
        pthread_cond_t notFull, notEmpty;
117
125
        pthread_cond_t belowFullDlyWtrMrk; /* below eFLOWCTL_FULL_DELAY watermark */
118
126
        pthread_cond_t belowLightDlyWtrMrk; /* below eFLOWCTL_FULL_DELAY watermark */
119
 
        pthread_cond_t condDAReady;/* signalled when the DA queue is fully initialized and ready for processing */
120
 
        int bChildIsDone;               /* set to 1 when the child DA queue has finished processing, 0 otherwise */
121
127
        int bThrdStateChanged;          /* at least one thread state has changed if 1 */
122
128
        /* end sync variables */
123
129
        /* the following variables are always present, because they
132
138
        int iNumberFiles;       /* how many files make up the queue? */
133
139
        int64 iMaxFileSize;     /* max size for a single queue file */
134
140
        int64 sizeOnDiskMax;    /* maximum size on disk allowed */
 
141
        qDeqID deqIDAdd;        /* next dequeue ID to use during add to queue store */
 
142
        qDeqID deqIDDel;        /* queue store delete position */
135
143
        int bIsDA;              /* is this queue disk assisted? */
136
 
        int bRunsDA;            /* is this queue actually *running* disk assisted? */
137
144
        struct queue_s *pqDA;   /* queue for disk-assisted modes */
138
145
        struct queue_s *pqParent;/* pointer to the parent (if this is a child queue) */
139
146
        int     bDAEnqOnly;     /* EnqOnly setting for DA queue */
140
 
        /* some data elements for the queueUngetObj() functionality. This list should always be short
141
 
         * and is always kept in memory
142
 
         */
143
 
        qLinkedList_t *pUngetRoot;
144
 
        qLinkedList_t *pUngetLast;
145
 
        int iUngottenObjs;      /* number of objects currently in the "ungotten" list */
146
147
        /* now follow queueing mode specific data elements */
147
148
        union {                 /* different data elements based on queue type (qType) */
148
149
                struct {
149
 
                        long head, tail;
 
150
                        long deqhead, head, tail;
150
151
                        void** pBuf;            /* the queued user data structure */
151
152
                } farray;
152
153
                struct {
153
 
                        qLinkedList_t *pRoot;
 
154
                        qLinkedList_t *pDeqRoot;
 
155
                        qLinkedList_t *pDelRoot;
154
156
                        qLinkedList_t *pLast;
155
157
                } linklist;
156
158
                struct {
157
159
                        int64 sizeOnDisk; /* current amount of disk space used */
158
160
                        int64 bytesRead;  /* number of bytes read from current (undeleted!) file */
159
 
                        strm_t *pWrite; /* current file to be written */
160
 
                        strm_t *pRead;  /* current file to be read */
 
161
                        strm_t *pWrite;   /* current file to be written */
 
162
                        strm_t *pReadDeq; /* current file for dequeueing */
 
163
                        strm_t *pReadDel; /* current file for deleting */
161
164
                } disk;
162
165
        } tVars;
163
 
} qqueue_t;
164
 
 
165
 
/* some symbolic constants for easier reference */
166
 
#define QUEUE_MODE_ENQDEQ 0
167
 
#define QUEUE_MODE_ENQONLY 1
168
 
 
169
 
#define QUEUE_IDX_DA_WORKER 0 /* index for the DA worker (fixed) */
170
 
#define QUEUE_PTR_DA_WORKER(x) (&((pThis)->pWrkThrds[0]))
 
166
        DEF_ATOMIC_HELPER_MUT(mutQueueSize);
 
167
        DEF_ATOMIC_HELPER_MUT(mutLogDeq);
 
168
        /* for statistics subsystem */
 
169
        statsobj_t *statsobj;
 
170
        STATSCOUNTER_DEF(ctrEnqueued, mutCtrEnqueued);
 
171
        STATSCOUNTER_DEF(ctrFull, mutCtrFull);
 
172
        int ctrMaxqsize;
 
173
};
 
174
 
171
175
 
172
176
/* the define below is an "eternal" timeout for the timeout settings which require a value.
173
177
 * It is one day, which is not really eternal, but comes close to it if we think about
178
182
 
179
183
/* prototypes */
180
184
rsRetVal qqueueDestruct(qqueue_t **ppThis);
181
 
rsRetVal qqueueMultiEnqObj(qqueue_t *pThis, multi_submit_t *pMultiSub);
 
185
rsRetVal qqueueEnqObjDirect(qqueue_t *pThis, void *pUsr);
182
186
rsRetVal qqueueEnqObj(qqueue_t *pThis, flowControl_t flwCtlType, void *pUsr);
183
187
rsRetVal qqueueStart(qqueue_t *pThis);
184
188
rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize);
185
189
rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix);
186
190
rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads,
187
 
                        int iMaxQueueSize, rsRetVal (*pConsumer)(void*,void*));
 
191
                        int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*, int*));
 
192
rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch);
188
193
PROTOTYPEObjClassInit(qqueue);
189
194
PROTOTYPEpropSetMeth(qqueue, iPersistUpdCnt, int);
190
195
PROTOTYPEpropSetMeth(qqueue, bSyncQueueFiles, int);
203
208
PROTOTYPEpropSetMeth(qqueue, pUsr, void*);
204
209
PROTOTYPEpropSetMeth(qqueue, iDeqSlowdown, int);
205
210
PROTOTYPEpropSetMeth(qqueue, sizeOnDiskMax, int64);
 
211
PROTOTYPEpropSetMeth(qqueue, iDeqBatchSize, int);
206
212
#define qqueueGetID(pThis) ((unsigned long) pThis)
207
213
 
208
214
#endif /* #ifndef QUEUE_H_INCLUDED */