47
typedef struct qWrkThrd_s {
48
pthread_t thrdID; /* thread ID */
49
qWrkCmd_t tCurrCmd; /* current command to be carried out by worker */
50
obj_t *pUsr; /* current user object being processed (or NULL if none) */
51
struct queue_s *pQueue; /* my queue (important if only the work thread instance is passed! */
52
int iThrd; /* my worker thread array index */
53
pthread_cond_t condInitDone; /* signaled when the thread startup is done (once per thread existance) */
55
} qWrkThrd_t; /* type for queue worker threads */
57
58
/* the queue object */
58
typedef struct queue_s {
61
bool bEnqOnly; /* does queue run in enqueue-only mode (1) or not (0)? */
62
bool bSaveOnShutdown;/* persists everthing on shutdown (if DA!)? 1-yes, 0-no */
63
bool bQueueStarted; /* has queueStart() been called on this queue? 1-yes, 0-no */
64
bool bQueueInDestruction;/* 1 if queue is in destruction process, 0 otherwise */
62
int nLogDeq; /* number of elements currently logically dequeued */
63
int bShutdownImmediate; /* should all workers cease processing messages? */
64
sbool bEnqOnly; /* does queue run in enqueue-only mode (1) or not (0)? */
65
sbool bSaveOnShutdown;/* persists everthing on shutdown (if DA!)? 1-yes, 0-no */
66
sbool bQueueStarted; /* has queueStart() been called on this queue? 1-yes, 0-no */
65
67
int iQueueSize; /* Current number of elements in the queue */
66
68
int iMaxQueueSize; /* how large can the queue grow? */
67
69
int iNumWorkerThreads;/* number of worker threads to use */
72
74
void *pUsr; /* a global, user-supplied pointer. Is passed back to consumer. */
73
75
int iUpdsSincePersist;/* nbr of queue updates since the last persist call */
74
76
int iPersistUpdCnt; /* persits queue info after this nbr of updates - 0 -> persist only on shutdown */
75
bool bSyncQueueFiles;/* if working with files, sync them after each write? */
77
sbool bSyncQueueFiles;/* if working with files, sync them after each write? */
76
78
int iHighWtrMrk; /* high water mark for disk-assisted memory queues */
77
79
int iLowWtrMrk; /* low water mark for disk-assisted memory queues */
78
80
int iDiscardMrk; /* if the queue is above this mark, low-severity messages are discarded */
79
81
int iFullDlyMrk; /* if the queue is above this mark, FULL_DELAYable message are put on hold */
80
82
int iLightDlyMrk; /* if the queue is above this mark, LIGHT_DELAYable message are put on hold */
81
83
int iDiscardSeverity;/* messages of this severity above are discarded on too-full queue */
82
bool bNeedDelQIF; /* does the QIF file need to be deleted when queue becomes empty? */
84
sbool bNeedDelQIF; /* does the QIF file need to be deleted when queue becomes empty? */
83
85
int toQShutdown; /* timeout for regular queue shutdown in ms */
84
86
int toActShutdown; /* timeout for long-running action shutdown in ms */
85
87
int toWrkShutdown; /* timeout for idle workers in ms, -1 means indefinite (0 is immediate) */
88
toDeleteLst_t *toDeleteLst;/* this queue's to-delete list */
86
89
int toEnq; /* enqueue timeout */
90
int iDeqBatchSize; /* max number of elements that shall be dequeued at once */
87
91
/* rate limiting settings (will be expanded) */
88
92
int iDeqSlowdown; /* slow down dequeue by specified nbr of microseconds */
89
93
/* end rate limiting */
97
101
* applied to detect user configuration errors (and tell me how should we detect what
98
102
* the user really wanted...). -- rgerhards, 2008-04-02
100
/* ane dequeue time window */
101
rsRetVal (*pConsumer)(void *,void*); /* user-supplied consumer function for dequeued messages */
104
/* end dequeue time window */
105
rsRetVal (*pConsumer)(void *,batch_t*,int*); /* user-supplied consumer function for dequeued messages */
102
106
/* calling interface for pConsumer: arg1 is the global user pointer from this structure, arg2 is the
103
* user pointer that was dequeued (actual sample: for actions, arg1 is the pAction and arg2 is pointer
105
* rgerhards, 2008-01-28
107
* user pointer array that was dequeued (actual sample: for actions, arg1 is the pAction and arg2
108
* is pointer to an array of message message pointers), arg3 is a pointer to an interger which is zero
109
* during normal operations and one if the consumer must urgently shut down.
107
111
/* type-specific handlers (set during construction) */
108
112
rsRetVal (*qConstruct)(struct queue_s *pThis);
109
113
rsRetVal (*qDestruct)(struct queue_s *pThis);
110
114
rsRetVal (*qAdd)(struct queue_s *pThis, void *pUsr);
111
rsRetVal (*qDel)(struct queue_s *pThis, void **ppUsr);
115
rsRetVal (*qDeq)(struct queue_s *pThis, void **ppUsr);
116
rsRetVal (*qDel)(struct queue_s *pThis);
112
117
/* end type-specific handler */
118
/* public entry points (set during construction, permit to set best algorithm for params selected) */
119
rsRetVal (*MultiEnq)(qqueue_t *pThis, multi_submit_t *pMultiSub);
120
/* end public entry points */
113
121
/* synchronization variables */
114
122
pthread_mutex_t mutThrdMgmt; /* mutex for the queue's thread management */
115
123
pthread_mutex_t *mut; /* mutex for enqueing and dequeueing messages */
116
124
pthread_cond_t notFull, notEmpty;
117
125
pthread_cond_t belowFullDlyWtrMrk; /* below eFLOWCTL_FULL_DELAY watermark */
118
126
pthread_cond_t belowLightDlyWtrMrk; /* below eFLOWCTL_FULL_DELAY watermark */
119
pthread_cond_t condDAReady;/* signalled when the DA queue is fully initialized and ready for processing */
120
int bChildIsDone; /* set to 1 when the child DA queue has finished processing, 0 otherwise */
121
127
int bThrdStateChanged; /* at least one thread state has changed if 1 */
122
128
/* end sync variables */
123
129
/* the following variables are always present, because they
132
138
int iNumberFiles; /* how many files make up the queue? */
133
139
int64 iMaxFileSize; /* max size for a single queue file */
134
140
int64 sizeOnDiskMax; /* maximum size on disk allowed */
141
qDeqID deqIDAdd; /* next dequeue ID to use during add to queue store */
142
qDeqID deqIDDel; /* queue store delete position */
135
143
int bIsDA; /* is this queue disk assisted? */
136
int bRunsDA; /* is this queue actually *running* disk assisted? */
137
144
struct queue_s *pqDA; /* queue for disk-assisted modes */
138
145
struct queue_s *pqParent;/* pointer to the parent (if this is a child queue) */
139
146
int bDAEnqOnly; /* EnqOnly setting for DA queue */
140
/* some data elements for the queueUngetObj() functionality. This list should always be short
141
* and is always kept in memory
143
qLinkedList_t *pUngetRoot;
144
qLinkedList_t *pUngetLast;
145
int iUngottenObjs; /* number of objects currently in the "ungotten" list */
146
147
/* now follow queueing mode specific data elements */
147
148
union { /* different data elements based on queue type (qType) */
150
long deqhead, head, tail;
150
151
void** pBuf; /* the queued user data structure */
153
qLinkedList_t *pRoot;
154
qLinkedList_t *pDeqRoot;
155
qLinkedList_t *pDelRoot;
154
156
qLinkedList_t *pLast;
157
159
int64 sizeOnDisk; /* current amount of disk space used */
158
160
int64 bytesRead; /* number of bytes read from current (undeleted!) file */
159
strm_t *pWrite; /* current file to be written */
160
strm_t *pRead; /* current file to be read */
161
strm_t *pWrite; /* current file to be written */
162
strm_t *pReadDeq; /* current file for dequeueing */
163
strm_t *pReadDel; /* current file for deleting */
165
/* some symbolic constants for easier reference */
166
#define QUEUE_MODE_ENQDEQ 0
167
#define QUEUE_MODE_ENQONLY 1
169
#define QUEUE_IDX_DA_WORKER 0 /* index for the DA worker (fixed) */
170
#define QUEUE_PTR_DA_WORKER(x) (&((pThis)->pWrkThrds[0]))
166
DEF_ATOMIC_HELPER_MUT(mutQueueSize);
167
DEF_ATOMIC_HELPER_MUT(mutLogDeq);
168
/* for statistics subsystem */
169
statsobj_t *statsobj;
170
STATSCOUNTER_DEF(ctrEnqueued, mutCtrEnqueued);
171
STATSCOUNTER_DEF(ctrFull, mutCtrFull);
172
176
/* the define below is an "eternal" timeout for the timeout settings which require a value.
173
177
* It is one day, which is not really eternal, but comes close to it if we think about
180
184
rsRetVal qqueueDestruct(qqueue_t **ppThis);
181
rsRetVal qqueueMultiEnqObj(qqueue_t *pThis, multi_submit_t *pMultiSub);
185
rsRetVal qqueueEnqObjDirect(qqueue_t *pThis, void *pUsr);
182
186
rsRetVal qqueueEnqObj(qqueue_t *pThis, flowControl_t flwCtlType, void *pUsr);
183
187
rsRetVal qqueueStart(qqueue_t *pThis);
184
188
rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize);
185
189
rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix);
186
190
rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads,
187
int iMaxQueueSize, rsRetVal (*pConsumer)(void*,void*));
191
int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*, int*));
192
rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch);
188
193
PROTOTYPEObjClassInit(qqueue);
189
194
PROTOTYPEpropSetMeth(qqueue, iPersistUpdCnt, int);
190
195
PROTOTYPEpropSetMeth(qqueue, bSyncQueueFiles, int);
203
208
PROTOTYPEpropSetMeth(qqueue, pUsr, void*);
204
209
PROTOTYPEpropSetMeth(qqueue, iDeqSlowdown, int);
205
210
PROTOTYPEpropSetMeth(qqueue, sizeOnDiskMax, int64);
211
PROTOTYPEpropSetMeth(qqueue, iDeqBatchSize, int);
206
212
#define qqueueGetID(pThis) ((unsigned long) pThis)
208
214
#endif /* #ifndef QUEUE_H_INCLUDED */