~ubuntu-branches/ubuntu/karmic/rsyslog/karmic-200908151517

« back to all changes in this revision

Viewing changes to queue.c

  • Committer: Bazaar Package Importer
  • Author(s): Michael Biebl
  • Date: 2008-07-23 02:22:32 UTC
  • mfrom: (1.1.5 upstream)
  • Revision ID: james.westby@ubuntu.com-20080723022232-496osxty0v9vvw9g
Tags: 3.18.1-1
* New upstream release. Closes: #490445
  - List Debian in doc/rsyslog_packages.html. Closes: #488870
  - Fix compilation of imklog module on GNU/kFreeBSD. Closes: #491193
* debian/rsyslog-doc.install
  - Install the example config file. Closes: #488860
* debian/rules
  - Enable mail output plugin.
  - Make sure all directories are created by calling dh_installdirs for both
    binary-arch and binary-indep. Closes: #491459
* debian/rsyslog.install
  - Install mail output plugin (ommail.so).
* debian/control
  - Add Suggests www-browser to rsyslog-doc as the package contains mostly
    html documents.
  - Update feature list.
  - Adjust priorities, set rsyslog priority to important.

Show diffs side-by-side

added added

removed removed

Lines of Context:
39
39
#include <fcntl.h>
40
40
#include <unistd.h>
41
41
#include <sys/stat.h>    /* required for HP UX */
 
42
#include <time.h>
42
43
#include <errno.h>
43
44
 
44
45
#include "rsyslog.h"
56
57
/* forward-definitions */
57
58
rsRetVal queueChkPersist(queue_t *pThis);
58
59
static rsRetVal queueSetEnqOnly(queue_t *pThis, int bEnqOnly, int bLockMutex);
 
60
static rsRetVal queueRateLimiter(queue_t *pThis);
59
61
static int queueChkStopWrkrDA(queue_t *pThis);
60
62
static int queueIsIdleDA(queue_t *pThis);
61
63
static rsRetVal queueConsumerDA(queue_t *pThis, wti_t *pWti, int iCancelStateSave);
272
274
        CHKiRet(queueSettoActShutdown(pThis->pqDA, pThis->toActShutdown));
273
275
        CHKiRet(queueSettoEnq(pThis->pqDA, pThis->toEnq));
274
276
        CHKiRet(queueSetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly, MUTEX_ALREADY_LOCKED));
 
277
        CHKiRet(queueSetiDeqtWinFromHr(pThis->pqDA, pThis->iDeqtWinFromHr));
 
278
        CHKiRet(queueSetiDeqtWinToHr(pThis->pqDA, pThis->iDeqtWinToHr));
275
279
        CHKiRet(queueSetiHighWtrMrk(pThis->pqDA, 0));
276
280
        CHKiRet(queueSetiDiscardMrk(pThis->pqDA, 0));
277
281
        if(pThis->toQShutdown == 0) {
845
849
 
846
850
        pThis->tVars.disk.sizeOnDisk += nWriteCount;
847
851
 
 
852
        /* The following line is a backport from 3.19.10 - fixes mem leak */
 
853
        objDestruct(pUsr);
848
854
        dbgoprint((obj_t*) pThis, "write wrote %lld octets to disk, queue disk size now %lld octets\n",
849
855
                   nWriteCount, pThis->tVars.disk.sizeOnDisk);
850
856
 
1268
1274
        pThis->iMaxQueueSize = iMaxQueueSize;
1269
1275
        pThis->pConsumer = pConsumer;
1270
1276
        pThis->iNumWorkerThreads = iWorkerThreads;
 
1277
        pThis->iDeqtWinToHr = 25; /* disable time-windowed dequeuing by default */
1271
1278
 
1272
1279
        pThis->pszFilePrefix = NULL;
1273
1280
        pThis->qType = qType;
1412
1419
         * on the nail [exact value]) -- rgerhards, 2008-03-14
1413
1420
         */
1414
1421
        if(iQueueSize < pThis->iFullDlyMrk) {
1415
 
dbgoprint((obj_t*) pThis, "queue size %d below FullDlyMrk %d\n", iQueueSize, pThis->iFullDlyMrk);
1416
1422
                pthread_cond_broadcast(&pThis->belowFullDlyWtrMrk);
1417
1423
        }
1418
1424
 
1419
1425
        if(iQueueSize < pThis->iLightDlyMrk) {
1420
 
dbgoprint((obj_t*) pThis, "queue size %d below LightDlyMrk %d\n", iQueueSize, pThis->iLightDlyMrk);
1421
1426
                pthread_cond_broadcast(&pThis->belowLightDlyWtrMrk);
1422
1427
        }
1423
1428
 
1451
1456
}
1452
1457
 
1453
1458
 
 
1459
/* The rate limiter
 
1460
 *
 
1461
 * Here we may wait if a dequeue time window is defined or if we are
 
1462
 * rate-limited. TODO: If we do so, we should also look into the
 
1463
 * way new worker threads are spawned. Obviously, it doesn't make much
 
1464
 * sense to spawn additional worker threads when none of them can do any
 
1465
 * processing. However, it is deemed acceptable to allow this for an initial
 
1466
 * implementation of the timeframe/rate limiting feature.
 
1467
 * Please also note that these feature could also be implemented at the action
 
1468
 * level. However, that would limit them to be used together with actions. We have
 
1469
 * taken the broader approach, moving it right into the queue. This is even
 
1470
 * necessary if we want to prevent spawning of multiple unnecessary worker
 
1471
 * threads as described above. -- rgerhards, 2008-04-02
 
1472
 *
 
1473
 *
 
1474
 * time window: tCurr is current time; tFrom is start time, tTo is end time (in mil 24h format).
 
1475
 * We may have tFrom = 4, tTo = 10 --> run from 4 to 10 hrs. nice and happy
 
1476
 * we may also have tFrom= 22, tTo = 4 -> run from 10pm to 4am, which is actually two
 
1477
 *     windows: 0-4; 22-23:59
 
1478
 * so when to run? Let's assume we have 3am
 
1479
 *
 
1480
 * if(tTo < tFrom) {
 
1481
 *      if(tCurr < tTo [3 < 4] || tCurr > tFrom [3 > 22])
 
1482
 *              do work
 
1483
 *      else
 
1484
 *              sleep for tFrom - tCurr "hours" [22 - 5 --> 17]
 
1485
 * } else {
 
1486
 *      if(tCurr >= tFrom [3 >= 4] && tCurr < tTo [3 < 10])
 
1487
 *              do work
 
1488
 *      else
 
1489
 *              sleep for tTo - tCurr "hours" [4 - 3 --> 1]
 
1490
 * }
 
1491
 *
 
1492
 * Bottom line: we need to check which type of window we have and need to adjust our
 
1493
 * logic accordingly. Of course, sleep calculations need to be done up to the minute, 
 
1494
 * but you get the idea from the code above.
 
1495
 */
 
1496
static rsRetVal
 
1497
queueRateLimiter(queue_t *pThis)
 
1498
{
 
1499
        DEFiRet;
 
1500
        int iDelay;
 
1501
        int iHrCurr;
 
1502
        time_t tCurr;
 
1503
        struct tm m;
 
1504
 
 
1505
        ISOBJ_TYPE_assert(pThis, queue);
 
1506
 
 
1507
        dbgoprint((obj_t*) pThis, "entering rate limiter\n");
 
1508
 
 
1509
        iDelay = 0;
 
1510
        if(pThis->iDeqtWinToHr != 25) { /* 25 means disabled */
 
1511
                /* time calls are expensive, so only do them when needed */
 
1512
                time(&tCurr);
 
1513
                localtime_r(&tCurr, &m);
 
1514
                iHrCurr = m.tm_hour;
 
1515
 
 
1516
                if(pThis->iDeqtWinToHr < pThis->iDeqtWinFromHr) {
 
1517
                        if(iHrCurr < pThis->iDeqtWinToHr || iHrCurr > pThis->iDeqtWinFromHr) {
 
1518
                                ; /* do not delay */
 
1519
                        } else {
 
1520
                                iDelay = (pThis->iDeqtWinFromHr - iHrCurr) * 3600;
 
1521
                                /* this time, we are already into the next hour, so we need
 
1522
                                 * to subtract our current minute and seconds.
 
1523
                                 */
 
1524
                                iDelay -= m.tm_min * 60;
 
1525
                                iDelay -= m.tm_sec;
 
1526
                        }
 
1527
                } else {
 
1528
                        if(iHrCurr >= pThis->iDeqtWinFromHr && iHrCurr < pThis->iDeqtWinToHr) {
 
1529
                                ; /* do not delay */
 
1530
                        } else {
 
1531
                                if(iHrCurr < pThis->iDeqtWinFromHr) {
 
1532
                                        iDelay = (pThis->iDeqtWinFromHr - iHrCurr - 1) * 3600; /* -1 as we are already in the hour */
 
1533
                                        iDelay += (60 - m.tm_min) * 60;
 
1534
                                        iDelay += 60 - m.tm_sec;
 
1535
                                } else {
 
1536
                                        iDelay = (24 - iHrCurr + pThis->iDeqtWinFromHr) * 3600;
 
1537
                                        /* this time, we are already into the next hour, so we need
 
1538
                                         * to subtract our current minute and seconds.
 
1539
                                         */
 
1540
                                        iDelay -= m.tm_min * 60;
 
1541
                                        iDelay -= m.tm_sec;
 
1542
                                }
 
1543
                        }
 
1544
                }
 
1545
        }
 
1546
 
 
1547
        if(iDelay > 0) {
 
1548
                dbgoprint((obj_t*) pThis, "outside dequeue time window, delaying %d seconds\n", iDelay);
 
1549
                srSleep(iDelay, 0);
 
1550
        }
 
1551
 
 
1552
        RETiRet;
 
1553
}
 
1554
 
 
1555
 
 
1556
 
1454
1557
/* This is the queue consumer in the regular (non-DA) case. It is 
1455
1558
 * protected by the queue mutex, but MUST release it as soon as possible.
1456
1559
 * rgerhards, 2008-01-21
1691
1794
        lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:Reg", obj.GetName((obj_t*) pThis));
1692
1795
        CHKiRet(wtpConstruct            (&pThis->pWtpReg));
1693
1796
        CHKiRet(wtpSetDbgHdr            (pThis->pWtpReg, pszBuf, lenBuf));
 
1797
        CHKiRet(wtpSetpfRateLimiter     (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) queueRateLimiter));
1694
1798
        CHKiRet(wtpSetpfChkStopWrkr     (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) queueChkStopWrkrReg));
1695
1799
        CHKiRet(wtpSetpfIsIdle          (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) queueIsIdleReg));
1696
1800
        CHKiRet(wtpSetpfDoWork          (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) queueConsumerReg));
2152
2256
 
2153
2257
/* some simple object access methods */
2154
2258
DEFpropSetMeth(queue, iPersistUpdCnt, int);
 
2259
DEFpropSetMeth(queue, iDeqtWinFromHr, int);
 
2260
DEFpropSetMeth(queue, iDeqtWinToHr, int);
2155
2261
DEFpropSetMeth(queue, toQShutdown, long);
2156
2262
DEFpropSetMeth(queue, toActShutdown, long);
2157
2263
DEFpropSetMeth(queue, toWrkShutdown, long);
2214
2320
        OBJSetMethodHandler(objMethod_SETPROPERTY, queueSetProperty);
2215
2321
ENDObjClassInit(queue)
2216
2322
 
2217
 
/*
2218
 
 * vi:set ai:
 
2323
/* vi:set ai:
2219
2324
 */