2
* This is an output module to support Hadoop's HDFS.
4
* NOTE: read comments in module-template.h to understand how this file
7
* Copyright 2010 Rainer Gerhards and Adiscon GmbH.
9
* This program is free software; you can redistribute it and/or
10
* modify it under the terms of the GNU General Public License
11
* as published by the Free Software Foundation; either version 2
12
* of the License, or (at your option) any later version.
14
* This program is distributed in the hope that it will be useful,
15
* but WITHOUT ANY WARRANTY; without even the implied warranty of
16
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17
* GNU General Public License for more details.
19
* You should have received a copy of the GNU General Public License
20
* along with this program; if not, write to the Free Software
21
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
23
* A copy of the GPL can be found in the file "COPYING" in this distribution.
41
#include "syslogd-types.h"
45
#include "cfsysline.h"
46
#include "module-template.h"
47
#include "unicode-helper.h"
49
#include "hashtable.h"
50
#include "hashtable_itr.h"
54
/* internal structures
60
static struct hashtable *files; /* holds all file objects that we know */
62
/* globals for default values */
63
static uchar *fileName = NULL;
64
static uchar *hdfsHost = NULL;
65
static uchar *dfltTplName = NULL; /* default template name to use */
67
/* end globals for default values */
80
typedef struct _instanceData {
84
/* forward definitions (down here, need data types) */
85
static inline rsRetVal fileClose(file_t *pFile);
87
BEGINisCompatibleWithFeature
88
CODESTARTisCompatibleWithFeature
89
if(eFeat == sFEATURERepeatedMsgReduction)
91
ENDisCompatibleWithFeature
95
CODESTARTdbgPrintInstInfo
96
printf("omhdfs: file:%s", pData->pFile->name);
100
/* note that hdfsFileExists() does not work, so we did our
101
* own function to see if a pathname exists. Returns 0 if the
102
* file does not exists, something else otherwise. Note that
103
* we can also check a directroy (if that matters...)
106
HDFSFileExists(hdfsFS fs, uchar *name)
111
info = hdfsGetPathInfo(fs, (char*)name);
112
/* if things go wrong, we assume it is because the file
113
* does not exist. We do not get too much information...
119
hdfsFreeFileInfo(info, 1);
124
static inline rsRetVal
125
HDFSmkdir(hdfsFS fs, uchar *name)
128
if(hdfsCreateDirectory(fs, (char*)name) == -1)
129
ABORT_FINALIZE(RS_RET_ERR);
136
/* ---BEGIN FILE OBJECT---------------------------------------------------- */
137
/* This code handles the "file object". This is split from the actual
138
* instance data, because several instances may write into the same file.
139
* If so, we need to use a single object, and also synchronize their writes.
140
* So we keep the file object separately, and just stick a reference into
144
static inline rsRetVal
145
fileObjConstruct(file_t **ppFile)
150
CHKmalloc(pFile = malloc(sizeof(file_t)));
152
pFile->hdfsHost = NULL;
162
fileObjAddUser(file_t *pFile)
164
/* init mutex only when second user is added */
166
if(pFile->nUsers == 2)
167
pthread_mutex_init(&pFile->mut, NULL);
168
DBGPRINTF("omhdfs: file %s now being used by %d actions\n", pFile->name, pFile->nUsers);
172
fileObjDestruct(file_t **ppFile)
174
file_t *pFile = *ppFile;
175
if(pFile->nUsers > 1)
176
pthread_mutex_destroy(&pFile->mut);
179
free((char*)pFile->hdfsHost);
186
/* check, and potentially create, all names inside a path */
188
filePrepare(file_t *pFile)
195
if(HDFSFileExists(pFile->fs, pFile->name))
198
/* file does not exist, create it (and eventually parent directories */
199
if(1) { // check if bCreateDirs
200
len = ustrlen(pFile->name) + 1;
201
CHKmalloc(pszWork = MALLOC(sizeof(uchar) * len));
202
memcpy(pszWork, pFile->name, len);
203
for(p = pszWork+1 ; *p ; p++)
205
/* temporarily terminate string, create dir and go on */
207
if(!HDFSFileExists(pFile->fs, pszWork)) {
208
CHKiRet(HDFSmkdir(pFile->fs, pszWork));
221
/* this function is to be used as destructor for the
225
fileObjDestruct4Hashtable(void *ptr)
227
file_t *pFile = (file_t*) ptr;
228
fileObjDestruct(&pFile);
232
static inline rsRetVal
233
fileOpen(file_t *pFile)
237
assert(pFile->fh == NULL);
238
if(pFile->nUsers > 1)
239
d_pthread_mutex_lock(&pFile->mut);
241
DBGPRINTF("omhdfs: try to connect to HDFS at host '%s', port %d\n",
242
pFile->hdfsHost, pFile->hdfsPort);
243
pFile->fs = hdfsConnect(pFile->hdfsHost, pFile->hdfsPort);
244
if(pFile->fs == NULL) {
245
DBGPRINTF("omhdfs: error can not connect to hdfs\n");
246
ABORT_FINALIZE(RS_RET_SUSPENDED);
249
CHKiRet(filePrepare(pFile));
251
pFile->fh = hdfsOpenFile(pFile->fs, (char*)pFile->name, O_WRONLY|O_APPEND, 0, 0, 0);
252
if(pFile->fh == NULL) {
253
/* maybe the file does not exist, so we try to create it now.
254
* Note that we can not use hdfsExists() because of a deficit in
255
* it: https://issues.apache.org/jira/browse/HDFS-1154
256
* As of my testing, libhdfs at least seems to return ENOENT if
257
* the file does not exist.
259
if(errno == ENOENT) {
260
DBGPRINTF("omhdfs: ENOENT trying to append to '%s', now trying create\n",
262
pFile->fh = hdfsOpenFile(pFile->fs, (char*)pFile->name, O_WRONLY|O_CREAT, 0, 0, 0);
265
if(pFile->fh == NULL) {
266
DBGPRINTF("omhdfs: failed to open %s for writing!\n", pFile->name);
267
ABORT_FINALIZE(RS_RET_SUSPENDED);
271
if(pFile->nUsers > 1)
272
d_pthread_mutex_unlock(&pFile->mut);
277
static inline rsRetVal
278
fileWrite(file_t *pFile, uchar *buf)
283
if(pFile->nUsers > 1)
284
d_pthread_mutex_lock(&pFile->mut);
286
/* open file if not open. This must be done *here* and while mutex-protected
287
* because of HUP handling (which is async to normal processing!).
289
if(pFile->fh == NULL) {
291
if(pFile->fh == NULL) {
292
ABORT_FINALIZE(RS_RET_SUSPENDED);
296
lenWrite = strlen((char*) buf);
297
tSize num_written_bytes = hdfsWrite(pFile->fs, pFile->fh, buf, lenWrite);
298
if((unsigned) num_written_bytes != lenWrite) {
299
errmsg.LogError(errno, RS_RET_ERR_HDFS_WRITE, "omhdfs: failed to write %s, expected %lu bytes, "
300
"written %lu\n", pFile->name, (unsigned long) lenWrite,
301
(unsigned long) num_written_bytes);
302
ABORT_FINALIZE(RS_RET_SUSPENDED);
306
if(pFile->nUsers > 1)
307
d_pthread_mutex_unlock(&pFile->mut);
312
static inline rsRetVal
313
fileClose(file_t *pFile)
317
if(pFile->fh == NULL)
320
if(pFile->nUsers > 1)
321
d_pthread_mutex_lock(&pFile->mut);
323
hdfsCloseFile(pFile->fs, pFile->fh);
326
if(pFile->nUsers > 1)
327
d_pthread_mutex_unlock(&pFile->mut);
333
/* ---END FILE OBJECT---------------------------------------------------- */
337
CODESTARTcreateInstance
343
CODESTARTfreeInstance
344
if(pData->pFile != NULL)
345
fileObjDestruct(&pData->pFile);
351
fileClose(pData->pFile);
352
fileOpen(pData->pFile);
353
if(pData->pFile->fh == NULL){
354
dbgprintf("omhdfs: tried to resume file %s, but still no luck...\n",
356
iRet = RS_RET_SUSPENDED;
362
DBGPRINTF("omuxsock: action to to write to %s\n", pData->pFile->name);
363
iRet = fileWrite(pData->pFile, ppString[0]);
367
BEGINparseSelectorAct
371
CODESTARTparseSelectorAct
373
/* first check if this config line is actually for us */
374
if(strncmp((char*) p, ":omhdfs:", sizeof(":omhdfs:") - 1)) {
375
ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED);
378
/* ok, if we reach this point, we have something for us */
379
p += sizeof(":omhdfs:") - 1; /* eat indicator sequence (-1 because of '\0'!) */
380
CHKiRet(createInstance(&pData));
381
CODE_STD_STRING_REQUESTparseSelectorAct(1)
382
CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, 0,
383
(dfltTplName == NULL) ? (uchar*)"RSYSLOG_FileFormat" : dfltTplName));
385
if(fileName == NULL) {
386
errmsg.LogError(0, RS_RET_ERR_HDFS_OPEN, "omhdfs: no file name specified, can not continue");
387
ABORT_FINALIZE(RS_RET_FILE_NOT_SPECIFIED);
390
pFile = hashtable_search(files, fileName);
392
/* we need a new file object, this one not seen before */
393
CHKiRet(fileObjConstruct(&pFile));
394
CHKmalloc(pFile->name = fileName);
395
CHKmalloc(keybuf = ustrdup(fileName));
396
fileName = NULL; /* re-set, data passed to file object */
397
CHKmalloc(pFile->hdfsHost = strdup((hdfsHost == NULL) ? "default" : (char*) hdfsHost));
398
pFile->hdfsPort = hdfsPort;
400
if(pFile->fh == NULL){
401
errmsg.LogError(0, RS_RET_ERR_HDFS_OPEN, "omhdfs: failed to open %s - "
402
"retrying later", pFile->name);
403
iRet = RS_RET_SUSPENDED;
405
r = hashtable_insert(files, keybuf, pFile);
407
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
409
fileObjAddUser(pFile);
410
pData->pFile = pFile;
412
CODE_STD_FINALIZERparseSelectorAct
418
struct hashtable_itr *itr;
420
DBGPRINTF("omhdfs: HUP received (file count %d)\n", hashtable_count(files));
421
/* Iterator constructor only returns a valid iterator if
422
* the hashtable is not empty */
423
itr = hashtable_iterator(files);
424
if(hashtable_count(files) > 0)
427
pFile = (file_t *) hashtable_iterator_value(itr);
429
DBGPRINTF("omhdfs: HUP, closing file %s\n", pFile->name);
430
} while (hashtable_iterator_advance(itr));
435
/* Reset config variables for this module to default values.
436
* rgerhards, 2007-07-17
438
static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal)
448
objRelease(errmsg, CORE_COMPONENT);
450
hashtable_destroy(files, 1); /* 1 => free all values automatically */
456
CODEqueryEtryPt_STD_OMOD_QUERIES
457
CODEqueryEtryPt_doHUP
463
*ipIFVersProvided = CURR_MOD_IF_VERSION;
464
CODEmodInit_QueryRegCFSLineHdlr
465
CHKiRet(objUse(errmsg, CORE_COMPONENT));
466
CHKmalloc(files = create_hashtable(20, hash_from_string, key_equals_string,
467
fileObjDestruct4Hashtable));
469
CHKiRet(regCfSysLineHdlr((uchar *)"omhdfsfilename", 0, eCmdHdlrGetWord, NULL, &fileName, NULL));
470
CHKiRet(regCfSysLineHdlr((uchar *)"omhdfshost", 0, eCmdHdlrGetWord, NULL, &hdfsHost, NULL));
471
CHKiRet(regCfSysLineHdlr((uchar *)"omhdfsport", 0, eCmdHdlrInt, NULL, &hdfsPort, NULL));
472
CHKiRet(regCfSysLineHdlr((uchar *)"omhdfsdefaulttemplate", 0, eCmdHdlrGetWord, NULL, &dfltTplName, NULL));
473
CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID));
474
CODEmodInit_QueryRegCFSLineHdlr