~arges/ubuntu/quantal/rsyslog/fix-lp1059592

« back to all changes in this revision

Viewing changes to runtime/stream.c

Tags: 5.7.3-1
* New upstream release.
* Upload to unstable.
* debian/patches/02-typo_fix_equation_sign.patch
  - Removed, merged upstream.
* debian/patches/03-atomic_operations.patch
  - Removed, merged upstream.

Show diffs side-by-side

added added

removed removed

Lines of Context:
60
60
#  include <sys/prctl.h>
61
61
#endif
62
62
 
63
 
#define inline
 
63
/* some platforms do not have large file support :( */
 
64
#ifndef O_LARGEFILE
 
65
#  define O_LARGEFILE 0
 
66
#endif
 
67
#ifndef HAVE_LSEEK64
 
68
   typedef  off_t off64_t;
 
69
#  define lseek64(fd, offset, whence) lseek(fd, offset, whence)
 
70
#endif
64
71
 
65
72
/* static data */
66
73
DEFobjStaticHelpers
214
221
                iFlags |= O_NONBLOCK;
215
222
        }
216
223
 
217
 
        pThis->fd = open((char*)pThis->pszCurrFName, iFlags, pThis->tOpenMode);
218
 
        DBGPRINTF("file '%s' opened as #%d with mode %d\n", pThis->pszCurrFName, pThis->fd, pThis->tOpenMode);
 
224
        pThis->fd = open((char*)pThis->pszCurrFName, iFlags | O_LARGEFILE, pThis->tOpenMode);
 
225
        DBGPRINTF("file '%s' opened as #%d with mode %d\n", pThis->pszCurrFName,
 
226
                  pThis->fd, (int) pThis->tOpenMode);
219
227
        if(pThis->fd == -1) {
220
228
                char errStr[1024];
221
229
                int err = errno;
260
268
                                    pThis->pszFName, pThis->lenFName, pThis->iCurrFNum, pThis->iFileNumDigits));
261
269
        } else {
262
270
                if(pThis->pszDir == NULL) {
263
 
                        if((pThis->pszCurrFName = (uchar*) strdup((char*) pThis->pszFName)) == NULL)
 
271
                        if((pThis->pszCurrFName = ustrdup(pThis->pszFName)) == NULL)
264
272
                                ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
265
273
                } else {
266
274
                        CHKiRet(genFileName(&pThis->pszCurrFName, pThis->pszDir, pThis->lenDir,
393
401
 * If we are monitoring a file, someone may have rotated it. In this case, we
394
402
 * also need to close it and reopen it under the same name.
395
403
 * rgerhards, 2008-02-13
 
404
 * The previous code also did a check for file truncation, in which case the
 
405
 * file was considered rewritten. However, this potential border case turned
 
406
 * out to be a big trouble spot on busy systems. It caused massive message
 
407
 * duplication (I guess stat() can return a too-low number under some
 
408
 * circumstances). So starting as of now, we only check the inode number and
 
409
 * a file change is detected only if the inode changes. -- rgerhards, 2011-01-10
396
410
 */
397
411
static rsRetVal
398
412
strmHandleEOFMonitor(strm_t *pThis)
402
416
        struct stat statName;
403
417
 
404
418
        ISOBJ_TYPE_assert(pThis, strm);
405
 
        /* find inodes of both current descriptor as well as file now in file
406
 
         * system. If they are different, the file has been rotated (or
407
 
         * otherwise rewritten). We also check the size, because the inode
408
 
         * does not change if the file is truncated (this, BTW, is also a case
409
 
         * where we actually loose log lines, because we can not do anything
410
 
         * against truncation...). We do NOT rely on the time of last
411
 
         * modificaton because that may not be available under all
412
 
         * circumstances. -- rgerhards, 2008-02-13
413
 
         */
414
419
        if(fstat(pThis->fd, &statOpen) == -1)
415
420
                ABORT_FINALIZE(RS_RET_IO_ERROR);
416
421
        if(stat((char*) pThis->pszCurrFName, &statName) == -1)
417
422
                ABORT_FINALIZE(RS_RET_IO_ERROR);
418
 
        if(statOpen.st_ino == statName.st_ino && pThis->iCurrOffs == statName.st_size) {
 
423
        DBGPRINTF("stream checking for file change on '%s', inode %u/%u",
 
424
          pThis->pszCurrFName, (unsigned) statOpen.st_ino,
 
425
          (unsigned) statName.st_ino);
 
426
        if(statOpen.st_ino == statName.st_ino) {
419
427
                ABORT_FINALIZE(RS_RET_EOF);
420
428
        } else {
421
429
                /* we had a file change! */
 
430
                DBGPRINTF("we had a file change on '%s'\n", pThis->pszCurrFName);
422
431
                CHKiRet(strmCloseFile(pThis));
423
432
                CHKiRet(strmOpenFile(pThis));
424
433
        }
553
562
        return RS_RET_OK;
554
563
}
555
564
 
556
 
 
557
 
/* read a line from a strm file. A line is terminated by LF. The LF is read, but it
558
 
 * is not returned in the buffer (it is discared). The caller is responsible for
559
 
 * destruction of the returned CStr object! -- rgerhards, 2008-01-07
560
 
 * rgerhards, 2008-03-27: I now use the ppCStr directly, without any interim
561
 
 * string pointer. The reason is that this function my be called by inputs, which
562
 
 * are pthread_killed() upon termination. So if we use their native pointer, they
563
 
 * can cleanup (but only then).
 
565
/* read a 'paragraph' from a strm file.
 
566
 * A paragraph may be terminated by a LF, by a LFLF, or by LF<not whitespace> depending on the option set.
 
567
 * The termination LF characters are read, but are
 
568
 * not returned in the buffer (it is discared). The caller is responsible for
 
569
 * destruction of the returned CStr object! -- dlang 2010-12-13
564
570
 */
565
571
static rsRetVal
566
 
strmReadLine(strm_t *pThis, cstr_t **ppCStr)
 
572
strmReadLine(strm_t *pThis, cstr_t **ppCStr, int mode)
567
573
{
568
 
        DEFiRet;
569
 
        uchar c;
570
 
 
571
 
        ASSERT(pThis != NULL);
572
 
        ASSERT(ppCStr != NULL);
573
 
 
574
 
        CHKiRet(cstrConstruct(ppCStr));
575
 
 
576
 
        /* now read the line */
577
 
        CHKiRet(strmReadChar(pThis, &c));
578
 
        while(c != '\n') {
579
 
                CHKiRet(cstrAppendChar(*ppCStr, c));
580
 
                CHKiRet(strmReadChar(pThis, &c));
581
 
        }
582
 
        CHKiRet(cstrFinalize(*ppCStr));
 
574
        /* mode = 0 single line mode (equivalent to ReadLine)
 
575
         * mode = 1 LFLF mode (paragraph, blank line between entries)
 
576
         * mode = 2 LF <not whitespace> mode, a log line starts at the beginning of a line, but following lines that are indented are part of the same log entry
 
577
         *  This modal interface is not nearly as flexible as being able to define a regex for when a new record starts, but it's also not nearly as hard (or as slow) to implement
 
578
         */
 
579
        DEFiRet;
 
580
        uchar c;
 
581
        uchar finished;
 
582
 
 
583
        ASSERT(pThis != NULL);
 
584
        ASSERT(ppCStr != NULL);
 
585
 
 
586
        CHKiRet(cstrConstruct(ppCStr));
 
587
 
 
588
        /* now read the line */
 
589
        CHKiRet(strmReadChar(pThis, &c));
 
590
        if (mode == 0){
 
591
                while(c != '\n') {
 
592
                        CHKiRet(cstrAppendChar(*ppCStr, c));
 
593
                        CHKiRet(strmReadChar(pThis, &c));
 
594
                }
 
595
                CHKiRet(cstrFinalize(*ppCStr));
 
596
        }
 
597
        if (mode == 1){
 
598
                finished=0;
 
599
                while(finished == 0){
 
600
                        if(c != '\n') {
 
601
                                CHKiRet(cstrAppendChar(*ppCStr, c));
 
602
                                CHKiRet(strmReadChar(pThis, &c));
 
603
                        } else {
 
604
                                if ((((*ppCStr)->iStrLen) > 0) ){
 
605
                                        if ((*ppCStr)->pBuf[(*ppCStr)->iStrLen -1 ] == '\n'){
 
606
                                                rsCStrTruncate(*ppCStr,1); /* remove the prior newline */
 
607
                                                finished=1;
 
608
                                        } else {
 
609
                                                CHKiRet(cstrAppendChar(*ppCStr, c));
 
610
                                                CHKiRet(strmReadChar(pThis, &c));
 
611
                                        }
 
612
                                } else {
 
613
                                        finished=1;  /* this is a blank line, a \n with nothing since the last complete record */
 
614
                                }
 
615
                        }
 
616
                }
 
617
                CHKiRet(cstrFinalize(*ppCStr));
 
618
        }
 
619
        if (mode == 2){
 
620
/* indented follow-up lines */
 
621
                finished=0;
 
622
                while(finished == 0){
 
623
                        if ((*ppCStr)->iStrLen == 0){
 
624
                                if(c != '\n') {
 
625
/* nothing in the buffer, and it's not a newline, add it to the buffer */
 
626
                                        CHKiRet(cstrAppendChar(*ppCStr, c));
 
627
                                        CHKiRet(strmReadChar(pThis, &c));
 
628
                                } else {
 
629
                                        finished=1;  /* this is a blank line, a \n with nothing since the last complete record */
 
630
                                }
 
631
                        } else {
 
632
                                if ((*ppCStr)->pBuf[(*ppCStr)->iStrLen -1 ] != '\n'){
 
633
/* not the first character after a newline, add it to the buffer */
 
634
                                        CHKiRet(cstrAppendChar(*ppCStr, c));
 
635
                                        CHKiRet(strmReadChar(pThis, &c));
 
636
                                } else {
 
637
                                        if ((c == ' ') || (c == '\t')){
 
638
                                                CHKiRet(cstrAppendChar(*ppCStr, c));
 
639
                                                CHKiRet(strmReadChar(pThis, &c));
 
640
                                        } else {
 
641
/* clean things up by putting the character we just read back into the input buffer and removing the LF character that is currently at the end of the output string */
 
642
                                                CHKiRet(strmUnreadChar(pThis, c));
 
643
                                                rsCStrTruncate(*ppCStr,1);
 
644
                                                finished=1;
 
645
                                        }
 
646
                                }
 
647
                        }
 
648
                }
 
649
                CHKiRet(cstrFinalize(*ppCStr));
 
650
        }
583
651
 
584
652
finalize_it:
585
 
        if(iRet != RS_RET_OK && *ppCStr != NULL)
586
 
                cstrDestruct(ppCStr);
 
653
        if(iRet != RS_RET_OK && *ppCStr != NULL)
 
654
                cstrDestruct(ppCStr);
587
655
 
588
 
        RETiRet;
 
656
        RETiRet;
589
657
}
590
658
 
591
659
 
625
693
                         * to make sure we can write out everything with a SINGLE api call!
626
694
                         * We add another 128 bytes to take care of the gzip header and "all eventualities".
627
695
                         */
628
 
                        CHKmalloc(pThis->pZipBuf = (Bytef*) malloc(sizeof(uchar) * (pThis->sIOBufSize + 128)));
 
696
                        CHKmalloc(pThis->pZipBuf = (Bytef*) MALLOC(sizeof(uchar) * (pThis->sIOBufSize + 128)));
629
697
                }
630
698
        }
631
699
 
657
725
                pthread_cond_init(&pThis->isEmpty, 0);
658
726
                pThis->iCnt = pThis->iEnq = pThis->iDeq = 0;
659
727
                for(i = 0 ; i < STREAM_ASYNC_NUMBUFS ; ++i) {
660
 
                        CHKmalloc(pThis->asyncBuf[i].pBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize));
 
728
                        CHKmalloc(pThis->asyncBuf[i].pBuf = (uchar*) MALLOC(sizeof(uchar) * pThis->sIOBufSize));
661
729
                }
662
730
                pThis->pIOBuf = pThis->asyncBuf[0].pBuf;
663
731
                pThis->bStopWriter = 0;
664
 
                if(pthread_create(&pThis->writerThreadID, NULL, asyncWriterThread, pThis) != 0)
 
732
                if(pthread_create(&pThis->writerThreadID,
 
733
#ifdef HAVE_PTHREAD_SETSCHEDPARAM
 
734
                                  &default_thread_attr,
 
735
#else
 
736
                                  NULL,
 
737
#endif
 
738
                                  asyncWriterThread, pThis) != 0)
665
739
                        DBGPRINTF("ERROR: stream %p cold not create writer thread\n", pThis);
666
740
        } else {
667
741
                /* we work synchronously, so we need to alloc a fixed pIOBuf */
668
 
                CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize));
 
742
                CHKmalloc(pThis->pIOBuf = (uchar*) MALLOC(sizeof(uchar) * pThis->sIOBufSize));
669
743
        }
670
744
 
671
745
finalize_it:
923
997
{
924
998
        int iDeq;
925
999
        struct timespec t;
926
 
        bool bTimedOut = 0;
 
1000
        sbool bTimedOut = 0;
927
1001
        strm_t *pThis = (strm_t*) pPtr;
928
1002
        ISOBJ_TYPE_assert(pThis, strm);
929
1003
 
936
1010
 
937
1011
        while(1) { /* loop broken inside */
938
1012
                d_pthread_mutex_lock(&pThis->mut);
939
 
dbgprintf("XXX: asyncWriterThread iterating %s\n", pThis->pszFName);
940
1013
                while(pThis->iCnt == 0) {
941
1014
                        if(pThis->bStopWriter) {
942
1015
                                pthread_cond_broadcast(&pThis->isEmpty);
952
1025
                        bTimedOut = 0;
953
1026
                        timeoutComp(&t, pThis->iFlushInterval * 2000); /* *1000 millisconds */ // TODO: check the 2000?!?
954
1027
                        if(pThis->bDoTimedWait) {
955
 
dbgprintf("asyncWriter thread going to timeout sleep\n");
956
1028
                                if(pthread_cond_timedwait(&pThis->notEmpty, &pThis->mut, &t) != 0) {
957
1029
                                        int err = errno;
958
1030
                                        if(err == ETIMEDOUT) {
966
1038
                                        }
967
1039
                                }
968
1040
                        } else {
969
 
dbgprintf("asyncWriter thread going to eternal sleep\n");
970
1041
                                d_pthread_cond_wait(&pThis->notEmpty, &pThis->mut);
971
1042
                        }
972
 
dbgprintf("asyncWriter woke up\n");
973
1043
                }
974
1044
 
975
1045
                bTimedOut = 0; /* we may have timed out, but there *is* work to do... */
976
1046
 
977
1047
                iDeq = pThis->iDeq++ % STREAM_ASYNC_NUMBUFS;
978
 
dbgprintf("asyncWriter writes data\n");
979
1048
                doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, pThis->asyncBuf[iDeq].lenBuf);
980
1049
                // TODO: error check????? 2009-07-06
981
1050
 
1090
1159
{
1091
1160
        z_stream zstrm;
1092
1161
        int zRet;       /* zlib return state */
1093
 
        bool bzInitDone = FALSE;
 
1162
        sbool bzInitDone = FALSE;
1094
1163
        DEFiRet;
1095
1164
        assert(pThis != NULL);
1096
1165
        assert(pBuf != NULL);
1188
1257
 * is invalidated.
1189
1258
 * rgerhards, 2008-01-12
1190
1259
 */
1191
 
static rsRetVal strmSeek(strm_t *pThis, off_t offs)
 
1260
static rsRetVal strmSeek(strm_t *pThis, off64_t offs)
1192
1261
{
1193
1262
        DEFiRet;
1194
1263
 
1198
1267
                strmOpenFile(pThis);
1199
1268
        else
1200
1269
                strmFlushInternal(pThis);
1201
 
        int i;
1202
 
        DBGOPRINT((obj_t*) pThis, "file %d seek, pos %ld\n", pThis->fd, (long) offs);
1203
 
        i = lseek(pThis->fd, offs, SEEK_SET); // TODO: check error!
 
1270
        long long i;
 
1271
        DBGOPRINT((obj_t*) pThis, "file %d seek, pos %llu\n", pThis->fd, (long long unsigned) offs);
 
1272
        i = lseek64(pThis->fd, offs, SEEK_SET); // TODO: check error!
1204
1273
        pThis->iCurrOffs = offs; /* we are now at *this* offset */
1205
1274
        pThis->iBufPtr = 0; /* buffer invalidated */
1206
1275
 
1384
1453
        if(pThis->pszFName != NULL)
1385
1454
                free(pThis->pszFName);
1386
1455
 
1387
 
        if((pThis->pszFName = malloc(sizeof(uchar) * (iLenName + 1))) == NULL)
 
1456
        if((pThis->pszFName = MALLOC(sizeof(uchar) * (iLenName + 1))) == NULL)
1388
1457
                ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
1389
1458
 
1390
1459
        memcpy(pThis->pszFName, pszName, iLenName + 1); /* always think about the \0! */
1411
1480
        if(iLenDir < 1)
1412
1481
                ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING);
1413
1482
 
1414
 
        CHKmalloc(pThis->pszDir = malloc(sizeof(uchar) * iLenDir + 1));
 
1483
        CHKmalloc(pThis->pszDir = MALLOC(sizeof(uchar) * (iLenDir + 1)));
1415
1484
 
1416
1485
        memcpy(pThis->pszDir, pszDir, iLenDir + 1); /* always think about the \0! */
1417
1486
        pThis->lenDir = iLenDir;
1477
1546
{
1478
1547
        DEFiRet;
1479
1548
        int i;
1480
 
        long l;
 
1549
        int64 l;
1481
1550
 
1482
1551
        ISOBJ_TYPE_assert(pThis, strm);
1483
1552
        ISOBJ_TYPE_assert(pStrm, strm);
1499
1568
        i = pThis->tOpenMode;
1500
1569
        objSerializeSCALAR_VAR(pStrm, tOpenMode, INT, i);
1501
1570
 
1502
 
        l = (long) pThis->iCurrOffs;
1503
 
        objSerializeSCALAR_VAR(pStrm, iCurrOffs, LONG, l);
 
1571
        l = pThis->iCurrOffs;
 
1572
        objSerializeSCALAR_VAR(pStrm, iCurrOffs, INT64, l);
1504
1573
 
1505
1574
        CHKiRet(obj.EndSerialize(pStrm));
1506
1575
 
1509
1578
}
1510
1579
 
1511
1580
 
 
1581
/* duplicate a stream object excluding dynamic properties. This function is
 
1582
 * primarily meant to provide a duplicate that later on can be used to access
 
1583
 * the data. This is needed, for example, for a restart of the disk queue.
 
1584
 * Note that ConstructFinalize() is NOT called. So our caller may change some
 
1585
 * properties before finalizing things.
 
1586
 * rgerhards, 2009-05-26
 
1587
 */
 
1588
rsRetVal
 
1589
strmDup(strm_t *pThis, strm_t **ppNew)
 
1590
{
 
1591
        strm_t *pNew = NULL;
 
1592
        DEFiRet;
 
1593
 
 
1594
        ISOBJ_TYPE_assert(pThis, strm);
 
1595
        assert(ppNew != NULL);
 
1596
 
 
1597
        CHKiRet(strmConstruct(&pNew));
 
1598
        pNew->sType = pThis->sType;
 
1599
        pNew->iCurrFNum = pThis->iCurrFNum;
 
1600
        CHKmalloc(pNew->pszFName = ustrdup(pThis->pszFName));
 
1601
        pNew->lenFName = pThis->lenFName;
 
1602
        CHKmalloc(pNew->pszDir = ustrdup(pThis->pszDir));
 
1603
        pNew->lenDir = pThis->lenDir;
 
1604
        pNew->tOperationsMode = pThis->tOperationsMode;
 
1605
        pNew->tOpenMode = pThis->tOpenMode;
 
1606
        pNew->iMaxFileSize = pThis->iMaxFileSize;
 
1607
        pNew->iMaxFiles = pThis->iMaxFiles;
 
1608
        pNew->iFileNumDigits = pThis->iFileNumDigits;
 
1609
        pNew->bDeleteOnClose = pThis->bDeleteOnClose;
 
1610
        pNew->iCurrOffs = pThis->iCurrOffs;
 
1611
        
 
1612
        *ppNew = pNew;
 
1613
        pNew = NULL;
 
1614
 
 
1615
finalize_it:
 
1616
        if(pNew != NULL)
 
1617
                strmDestruct(&pNew);
 
1618
 
 
1619
        RETiRet;
 
1620
}
1512
1621
 
1513
1622
/* set a user write-counter. This counter is initialized to zero and
1514
1623
 * receives the number of bytes written. It is accurate only after a
1624
1733
        pIf->RecordEnd = strmRecordEnd;
1625
1734
        pIf->Serialize = strmSerialize;
1626
1735
        pIf->GetCurrOffset = strmGetCurrOffset;
 
1736
        pIf->Dup = strmDup;
1627
1737
        pIf->SetWCntr = strmSetWCntr;
1628
1738
        /* set methods */
1629
1739
        pIf->SetbDeleteOnClose = strmSetbDeleteOnClose;