393
401
* If we are monitoring a file, someone may have rotated it. In this case, we
394
402
* also need to close it and reopen it under the same name.
395
403
* rgerhards, 2008-02-13
404
* The previous code also did a check for file truncation, in which case the
405
* file was considered rewritten. However, this potential border case turned
406
* out to be a big trouble spot on busy systems. It caused massive message
407
* duplication (I guess stat() can return a too-low number under some
408
* circumstances). So starting as of now, we only check the inode number and
409
* a file change is detected only if the inode changes. -- rgerhards, 2011-01-10
398
412
strmHandleEOFMonitor(strm_t *pThis)
402
416
struct stat statName;
404
418
ISOBJ_TYPE_assert(pThis, strm);
405
/* find inodes of both current descriptor as well as file now in file
406
* system. If they are different, the file has been rotated (or
407
* otherwise rewritten). We also check the size, because the inode
408
* does not change if the file is truncated (this, BTW, is also a case
409
* where we actually loose log lines, because we can not do anything
410
* against truncation...). We do NOT rely on the time of last
411
* modificaton because that may not be available under all
412
* circumstances. -- rgerhards, 2008-02-13
414
419
if(fstat(pThis->fd, &statOpen) == -1)
415
420
ABORT_FINALIZE(RS_RET_IO_ERROR);
416
421
if(stat((char*) pThis->pszCurrFName, &statName) == -1)
417
422
ABORT_FINALIZE(RS_RET_IO_ERROR);
418
if(statOpen.st_ino == statName.st_ino && pThis->iCurrOffs == statName.st_size) {
423
DBGPRINTF("stream checking for file change on '%s', inode %u/%u",
424
pThis->pszCurrFName, (unsigned) statOpen.st_ino,
425
(unsigned) statName.st_ino);
426
if(statOpen.st_ino == statName.st_ino) {
419
427
ABORT_FINALIZE(RS_RET_EOF);
421
429
/* we had a file change! */
430
DBGPRINTF("we had a file change on '%s'\n", pThis->pszCurrFName);
422
431
CHKiRet(strmCloseFile(pThis));
423
432
CHKiRet(strmOpenFile(pThis));
553
562
return RS_RET_OK;
557
/* read a line from a strm file. A line is terminated by LF. The LF is read, but it
558
* is not returned in the buffer (it is discared). The caller is responsible for
559
* destruction of the returned CStr object! -- rgerhards, 2008-01-07
560
* rgerhards, 2008-03-27: I now use the ppCStr directly, without any interim
561
* string pointer. The reason is that this function my be called by inputs, which
562
* are pthread_killed() upon termination. So if we use their native pointer, they
563
* can cleanup (but only then).
565
/* read a 'paragraph' from a strm file.
566
* A paragraph may be terminated by a LF, by a LFLF, or by LF<not whitespace> depending on the option set.
567
* The termination LF characters are read, but are
568
* not returned in the buffer (it is discared). The caller is responsible for
569
* destruction of the returned CStr object! -- dlang 2010-12-13
566
strmReadLine(strm_t *pThis, cstr_t **ppCStr)
572
strmReadLine(strm_t *pThis, cstr_t **ppCStr, int mode)
571
ASSERT(pThis != NULL);
572
ASSERT(ppCStr != NULL);
574
CHKiRet(cstrConstruct(ppCStr));
576
/* now read the line */
577
CHKiRet(strmReadChar(pThis, &c));
579
CHKiRet(cstrAppendChar(*ppCStr, c));
580
CHKiRet(strmReadChar(pThis, &c));
582
CHKiRet(cstrFinalize(*ppCStr));
574
/* mode = 0 single line mode (equivalent to ReadLine)
575
* mode = 1 LFLF mode (paragraph, blank line between entries)
576
* mode = 2 LF <not whitespace> mode, a log line starts at the beginning of a line, but following lines that are indented are part of the same log entry
577
* This modal interface is not nearly as flexible as being able to define a regex for when a new record starts, but it's also not nearly as hard (or as slow) to implement
583
ASSERT(pThis != NULL);
584
ASSERT(ppCStr != NULL);
586
CHKiRet(cstrConstruct(ppCStr));
588
/* now read the line */
589
CHKiRet(strmReadChar(pThis, &c));
592
CHKiRet(cstrAppendChar(*ppCStr, c));
593
CHKiRet(strmReadChar(pThis, &c));
595
CHKiRet(cstrFinalize(*ppCStr));
599
while(finished == 0){
601
CHKiRet(cstrAppendChar(*ppCStr, c));
602
CHKiRet(strmReadChar(pThis, &c));
604
if ((((*ppCStr)->iStrLen) > 0) ){
605
if ((*ppCStr)->pBuf[(*ppCStr)->iStrLen -1 ] == '\n'){
606
rsCStrTruncate(*ppCStr,1); /* remove the prior newline */
609
CHKiRet(cstrAppendChar(*ppCStr, c));
610
CHKiRet(strmReadChar(pThis, &c));
613
finished=1; /* this is a blank line, a \n with nothing since the last complete record */
617
CHKiRet(cstrFinalize(*ppCStr));
620
/* indented follow-up lines */
622
while(finished == 0){
623
if ((*ppCStr)->iStrLen == 0){
625
/* nothing in the buffer, and it's not a newline, add it to the buffer */
626
CHKiRet(cstrAppendChar(*ppCStr, c));
627
CHKiRet(strmReadChar(pThis, &c));
629
finished=1; /* this is a blank line, a \n with nothing since the last complete record */
632
if ((*ppCStr)->pBuf[(*ppCStr)->iStrLen -1 ] != '\n'){
633
/* not the first character after a newline, add it to the buffer */
634
CHKiRet(cstrAppendChar(*ppCStr, c));
635
CHKiRet(strmReadChar(pThis, &c));
637
if ((c == ' ') || (c == '\t')){
638
CHKiRet(cstrAppendChar(*ppCStr, c));
639
CHKiRet(strmReadChar(pThis, &c));
641
/* clean things up by putting the character we just read back into the input buffer and removing the LF character that is currently at the end of the output string */
642
CHKiRet(strmUnreadChar(pThis, c));
643
rsCStrTruncate(*ppCStr,1);
649
CHKiRet(cstrFinalize(*ppCStr));
585
if(iRet != RS_RET_OK && *ppCStr != NULL)
586
cstrDestruct(ppCStr);
653
if(iRet != RS_RET_OK && *ppCStr != NULL)
654
cstrDestruct(ppCStr);
657
725
pthread_cond_init(&pThis->isEmpty, 0);
658
726
pThis->iCnt = pThis->iEnq = pThis->iDeq = 0;
659
727
for(i = 0 ; i < STREAM_ASYNC_NUMBUFS ; ++i) {
660
CHKmalloc(pThis->asyncBuf[i].pBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize));
728
CHKmalloc(pThis->asyncBuf[i].pBuf = (uchar*) MALLOC(sizeof(uchar) * pThis->sIOBufSize));
662
730
pThis->pIOBuf = pThis->asyncBuf[0].pBuf;
663
731
pThis->bStopWriter = 0;
664
if(pthread_create(&pThis->writerThreadID, NULL, asyncWriterThread, pThis) != 0)
732
if(pthread_create(&pThis->writerThreadID,
733
#ifdef HAVE_PTHREAD_SETSCHEDPARAM
734
&default_thread_attr,
738
asyncWriterThread, pThis) != 0)
665
739
DBGPRINTF("ERROR: stream %p cold not create writer thread\n", pThis);
667
741
/* we work synchronously, so we need to alloc a fixed pIOBuf */
668
CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize));
742
CHKmalloc(pThis->pIOBuf = (uchar*) MALLOC(sizeof(uchar) * pThis->sIOBufSize));
1198
1267
strmOpenFile(pThis);
1200
1269
strmFlushInternal(pThis);
1202
DBGOPRINT((obj_t*) pThis, "file %d seek, pos %ld\n", pThis->fd, (long) offs);
1203
i = lseek(pThis->fd, offs, SEEK_SET); // TODO: check error!
1271
DBGOPRINT((obj_t*) pThis, "file %d seek, pos %llu\n", pThis->fd, (long long unsigned) offs);
1272
i = lseek64(pThis->fd, offs, SEEK_SET); // TODO: check error!
1204
1273
pThis->iCurrOffs = offs; /* we are now at *this* offset */
1205
1274
pThis->iBufPtr = 0; /* buffer invalidated */
1581
/* duplicate a stream object excluding dynamic properties. This function is
1582
* primarily meant to provide a duplicate that later on can be used to access
1583
* the data. This is needed, for example, for a restart of the disk queue.
1584
* Note that ConstructFinalize() is NOT called. So our caller may change some
1585
* properties before finalizing things.
1586
* rgerhards, 2009-05-26
1589
strmDup(strm_t *pThis, strm_t **ppNew)
1591
strm_t *pNew = NULL;
1594
ISOBJ_TYPE_assert(pThis, strm);
1595
assert(ppNew != NULL);
1597
CHKiRet(strmConstruct(&pNew));
1598
pNew->sType = pThis->sType;
1599
pNew->iCurrFNum = pThis->iCurrFNum;
1600
CHKmalloc(pNew->pszFName = ustrdup(pThis->pszFName));
1601
pNew->lenFName = pThis->lenFName;
1602
CHKmalloc(pNew->pszDir = ustrdup(pThis->pszDir));
1603
pNew->lenDir = pThis->lenDir;
1604
pNew->tOperationsMode = pThis->tOperationsMode;
1605
pNew->tOpenMode = pThis->tOpenMode;
1606
pNew->iMaxFileSize = pThis->iMaxFileSize;
1607
pNew->iMaxFiles = pThis->iMaxFiles;
1608
pNew->iFileNumDigits = pThis->iFileNumDigits;
1609
pNew->bDeleteOnClose = pThis->bDeleteOnClose;
1610
pNew->iCurrOffs = pThis->iCurrOffs;
1617
strmDestruct(&pNew);
1513
1622
/* set a user write-counter. This counter is initialized to zero and
1514
1623
* receives the number of bytes written. It is accurate only after a