~arges/ubuntu/quantal/rsyslog/fix-lp1059592

« back to all changes in this revision

Viewing changes to plugins/omhdfs/omhdfs.c

Tags: 5.7.3-1
* New upstream release.
* Upload to unstable.
* debian/patches/02-typo_fix_equation_sign.patch
  - Removed, merged upstream.
* debian/patches/03-atomic_operations.patch
  - Removed, merged upstream.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* omhdfs.c
 
2
 * This is an output module to support Hadoop's HDFS.
 
3
 *
 
4
 * NOTE: read comments in module-template.h to understand how this file
 
5
 *       works!
 
6
 *
 
7
 * Copyright 2010 Rainer Gerhards and Adiscon GmbH.
 
8
 *
 
9
 * This program is free software; you can redistribute it and/or
 
10
 * modify it under the terms of the GNU General Public License
 
11
 * as published by the Free Software Foundation; either version 2
 
12
 * of the License, or (at your option) any later version.
 
13
 *
 
14
 * This program is distributed in the hope that it will be useful,
 
15
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
16
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
17
 * GNU General Public License for more details.
 
18
 *
 
19
 * You should have received a copy of the GNU General Public License
 
20
 * along with this program; if not, write to the Free Software
 
21
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
 
22
 *
 
23
 * A copy of the GPL can be found in the file "COPYING" in this distribution.
 
24
 */
 
25
 
 
26
#include "config.h"
 
27
#include "rsyslog.h"
 
28
#include <stdio.h>
 
29
#include <stdarg.h>
 
30
#include <stdlib.h>
 
31
#include <string.h>
 
32
#include <time.h>
 
33
#include <assert.h>
 
34
#include <errno.h>
 
35
#include <ctype.h>
 
36
#include <unistd.h>
 
37
#include <sys/file.h>
 
38
#include <pthread.h>
 
39
#include <hdfs.h>
 
40
 
 
41
#include "syslogd-types.h"
 
42
#include "srUtils.h"
 
43
#include "template.h"
 
44
#include "conf.h"
 
45
#include "cfsysline.h"
 
46
#include "module-template.h"
 
47
#include "unicode-helper.h"
 
48
#include "errmsg.h"
 
49
#include "hashtable.h"
 
50
#include "hashtable_itr.h"
 
51
 
 
52
MODULE_TYPE_OUTPUT
 
53
 
 
54
/* internal structures
 
55
 */
 
56
DEF_OMOD_STATIC_DATA
 
57
DEFobjCurrIf(errmsg)
 
58
 
 
59
/* global data */
 
60
static struct hashtable *files;         /* holds all file objects that we know */
 
61
 
 
62
/* globals for default values */
 
63
static uchar *fileName = NULL;  
 
64
static uchar *hdfsHost = NULL;  
 
65
static uchar *dfltTplName = NULL;       /* default template name to use */
 
66
int hdfsPort = 0;
 
67
/* end globals for default values */
 
68
 
 
69
typedef struct {
 
70
        uchar   *name;
 
71
        hdfsFS fs;
 
72
        hdfsFile fh;
 
73
        const char *hdfsHost;
 
74
        tPort hdfsPort;
 
75
        int nUsers;
 
76
        pthread_mutex_t mut;
 
77
} file_t;
 
78
 
 
79
 
 
80
typedef struct _instanceData {
 
81
        file_t *pFile;
 
82
} instanceData;
 
83
 
 
84
/* forward definitions (down here, need data types) */
 
85
static inline rsRetVal fileClose(file_t *pFile);
 
86
 
 
87
BEGINisCompatibleWithFeature
 
88
CODESTARTisCompatibleWithFeature
 
89
        if(eFeat == sFEATURERepeatedMsgReduction)
 
90
                iRet = RS_RET_OK;
 
91
ENDisCompatibleWithFeature
 
92
 
 
93
 
 
94
BEGINdbgPrintInstInfo
 
95
CODESTARTdbgPrintInstInfo
 
96
        printf("omhdfs: file:%s", pData->pFile->name);
 
97
ENDdbgPrintInstInfo
 
98
 
 
99
 
 
100
/* note that hdfsFileExists() does not work, so we did our
 
101
 * own function to see if a pathname exists. Returns 0 if the
 
102
 * file does not exists, something else otherwise. Note that
 
103
 * we can also check a directroy (if that matters...)
 
104
 */
 
105
static int
 
106
HDFSFileExists(hdfsFS fs, uchar *name)
 
107
{
 
108
        int r;
 
109
        hdfsFileInfo *info;
 
110
 
 
111
        info = hdfsGetPathInfo(fs, (char*)name);
 
112
        /* if things go wrong, we assume it is because the file
 
113
         * does not exist. We do not get too much information...
 
114
         */
 
115
        if(info == NULL) {
 
116
                r = 0;
 
117
        } else {
 
118
                r = 1;
 
119
                hdfsFreeFileInfo(info, 1);
 
120
        }
 
121
        return r;
 
122
}
 
123
 
 
124
static inline rsRetVal
 
125
HDFSmkdir(hdfsFS fs, uchar *name)
 
126
{
 
127
        DEFiRet;
 
128
        if(hdfsCreateDirectory(fs, (char*)name) == -1)
 
129
                ABORT_FINALIZE(RS_RET_ERR);
 
130
 
 
131
finalize_it:
 
132
        RETiRet;
 
133
}
 
134
 
 
135
 
 
136
/* ---BEGIN FILE OBJECT---------------------------------------------------- */
 
137
/* This code handles the "file object". This is split from the actual
 
138
 * instance data, because several instances may write into the same file.
 
139
 * If so, we need to use a single object, and also synchronize their writes.
 
140
 * So we keep the file object separately, and just stick a reference into
 
141
 * the instance data.
 
142
 */
 
143
 
 
144
static inline rsRetVal
 
145
fileObjConstruct(file_t **ppFile)
 
146
{
 
147
        file_t *pFile;
 
148
        DEFiRet;
 
149
 
 
150
        CHKmalloc(pFile = malloc(sizeof(file_t)));
 
151
        pFile->name = NULL;
 
152
        pFile->hdfsHost = NULL;
 
153
        pFile->fh = NULL;
 
154
        pFile->nUsers = 0;
 
155
 
 
156
        *ppFile = pFile;
 
157
finalize_it:
 
158
        RETiRet;
 
159
}
 
160
 
 
161
static inline void
 
162
fileObjAddUser(file_t *pFile)
 
163
{
 
164
        /* init mutex only when second user is added */
 
165
        ++pFile->nUsers;
 
166
        if(pFile->nUsers == 2)
 
167
                pthread_mutex_init(&pFile->mut, NULL);
 
168
        DBGPRINTF("omhdfs: file %s now being used by %d actions\n", pFile->name, pFile->nUsers);
 
169
}
 
170
 
 
171
static rsRetVal
 
172
fileObjDestruct(file_t **ppFile)
 
173
{
 
174
        file_t *pFile = *ppFile;
 
175
        if(pFile->nUsers > 1)
 
176
                pthread_mutex_destroy(&pFile->mut);
 
177
        fileClose(pFile);
 
178
        free(pFile->name);
 
179
        free((char*)pFile->hdfsHost);
 
180
        free(pFile->fh);
 
181
 
 
182
        return RS_RET_OK;
 
183
}
 
184
 
 
185
 
 
186
/* check, and potentially create, all names inside a path */
 
187
static rsRetVal
 
188
filePrepare(file_t *pFile)
 
189
{
 
190
        uchar *p;
 
191
        uchar *pszWork;
 
192
        size_t len;
 
193
        DEFiRet;
 
194
 
 
195
        if(HDFSFileExists(pFile->fs, pFile->name))
 
196
                FINALIZE;
 
197
 
 
198
        /* file does not exist, create it (and eventually parent directories */
 
199
        if(1) { // check if bCreateDirs
 
200
                len = ustrlen(pFile->name) + 1;
 
201
                CHKmalloc(pszWork = MALLOC(sizeof(uchar) * len));
 
202
                memcpy(pszWork, pFile->name, len);
 
203
                for(p = pszWork+1 ; *p ; p++)
 
204
                        if(*p == '/') {
 
205
                                /* temporarily terminate string, create dir and go on */
 
206
                                *p = '\0';
 
207
                                if(!HDFSFileExists(pFile->fs, pszWork)) {
 
208
                                        CHKiRet(HDFSmkdir(pFile->fs, pszWork));
 
209
                                }
 
210
                                *p = '/';
 
211
                        }
 
212
                free(pszWork);
 
213
                return 0;
 
214
        }
 
215
 
 
216
finalize_it:
 
217
        RETiRet;
 
218
}
 
219
 
 
220
 
 
221
/* this function is to be used as destructor for the
 
222
 * hash table code.
 
223
 */
 
224
static void
 
225
fileObjDestruct4Hashtable(void *ptr)
 
226
{
 
227
        file_t *pFile = (file_t*) ptr;
 
228
        fileObjDestruct(&pFile);
 
229
}
 
230
 
 
231
 
 
232
static inline rsRetVal
 
233
fileOpen(file_t *pFile)
 
234
{
 
235
        DEFiRet;
 
236
 
 
237
        assert(pFile->fh == NULL);
 
238
        if(pFile->nUsers > 1)
 
239
                d_pthread_mutex_lock(&pFile->mut);
 
240
 
 
241
        DBGPRINTF("omhdfs: try to connect to HDFS at host '%s', port %d\n",
 
242
                  pFile->hdfsHost, pFile->hdfsPort);
 
243
        pFile->fs = hdfsConnect(pFile->hdfsHost, pFile->hdfsPort);
 
244
        if(pFile->fs == NULL) {
 
245
                DBGPRINTF("omhdfs: error can not connect to hdfs\n");
 
246
                ABORT_FINALIZE(RS_RET_SUSPENDED);
 
247
        }
 
248
 
 
249
        CHKiRet(filePrepare(pFile));
 
250
 
 
251
        pFile->fh = hdfsOpenFile(pFile->fs, (char*)pFile->name, O_WRONLY|O_APPEND, 0, 0, 0);
 
252
        if(pFile->fh == NULL) {
 
253
                /* maybe the file does not exist, so we try to create it now.
 
254
                 * Note that we can not use hdfsExists() because of a deficit in
 
255
                 * it: https://issues.apache.org/jira/browse/HDFS-1154
 
256
                 * As of my testing, libhdfs at least seems to return ENOENT if
 
257
                 * the file does not exist.
 
258
                 */
 
259
                if(errno == ENOENT) {
 
260
                        DBGPRINTF("omhdfs: ENOENT trying to append to '%s', now trying create\n",
 
261
                                  pFile->name);
 
262
                        pFile->fh = hdfsOpenFile(pFile->fs, (char*)pFile->name, O_WRONLY|O_CREAT, 0, 0, 0);
 
263
                }
 
264
        }
 
265
        if(pFile->fh == NULL) {
 
266
                DBGPRINTF("omhdfs: failed to open %s for writing!\n", pFile->name);
 
267
                ABORT_FINALIZE(RS_RET_SUSPENDED);
 
268
        }
 
269
 
 
270
finalize_it:
 
271
        if(pFile->nUsers > 1)
 
272
                d_pthread_mutex_unlock(&pFile->mut);
 
273
        RETiRet;
 
274
}
 
275
 
 
276
 
 
277
static inline rsRetVal
 
278
fileWrite(file_t *pFile, uchar *buf)
 
279
{
 
280
        size_t lenWrite;
 
281
        DEFiRet;
 
282
 
 
283
        if(pFile->nUsers > 1)
 
284
                d_pthread_mutex_lock(&pFile->mut);
 
285
 
 
286
        /* open file if not open. This must be done *here* and while mutex-protected
 
287
         * because of HUP handling (which is async to normal processing!).
 
288
         */
 
289
        if(pFile->fh == NULL) {
 
290
                fileOpen(pFile);
 
291
                if(pFile->fh == NULL) {
 
292
                        ABORT_FINALIZE(RS_RET_SUSPENDED);
 
293
                }
 
294
        }
 
295
 
 
296
        lenWrite = strlen((char*) buf);
 
297
        tSize num_written_bytes = hdfsWrite(pFile->fs, pFile->fh, buf, lenWrite);
 
298
        if((unsigned) num_written_bytes != lenWrite) {
 
299
                errmsg.LogError(errno, RS_RET_ERR_HDFS_WRITE, "omhdfs: failed to write %s, expected %lu bytes, "
 
300
                                "written %lu\n", pFile->name, (unsigned long) lenWrite,
 
301
                                (unsigned long) num_written_bytes);
 
302
                ABORT_FINALIZE(RS_RET_SUSPENDED);
 
303
        }
 
304
 
 
305
finalize_it:
 
306
        if(pFile->nUsers > 1)
 
307
                d_pthread_mutex_unlock(&pFile->mut);
 
308
        RETiRet;
 
309
}
 
310
 
 
311
 
 
312
static inline rsRetVal
 
313
fileClose(file_t *pFile)
 
314
{
 
315
        DEFiRet;
 
316
 
 
317
        if(pFile->fh == NULL)
 
318
                FINALIZE;
 
319
 
 
320
        if(pFile->nUsers > 1)
 
321
                d_pthread_mutex_lock(&pFile->mut);
 
322
 
 
323
        hdfsCloseFile(pFile->fs, pFile->fh);
 
324
        pFile->fh = NULL;
 
325
 
 
326
        if(pFile->nUsers > 1)
 
327
                d_pthread_mutex_unlock(&pFile->mut);
 
328
 
 
329
finalize_it:
 
330
        RETiRet;
 
331
}
 
332
 
 
333
/* ---END FILE OBJECT---------------------------------------------------- */
 
334
 
 
335
 
 
336
BEGINcreateInstance
 
337
CODESTARTcreateInstance
 
338
        pData->pFile = NULL;
 
339
ENDcreateInstance
 
340
 
 
341
 
 
342
BEGINfreeInstance
 
343
CODESTARTfreeInstance
 
344
        if(pData->pFile != NULL)
 
345
                fileObjDestruct(&pData->pFile);
 
346
ENDfreeInstance
 
347
 
 
348
 
 
349
BEGINtryResume
 
350
CODESTARTtryResume
 
351
        fileClose(pData->pFile);
 
352
        fileOpen(pData->pFile);
 
353
        if(pData->pFile->fh == NULL){
 
354
                dbgprintf("omhdfs: tried to resume file %s, but still no luck...\n",
 
355
                          pData->pFile->name);
 
356
                iRet = RS_RET_SUSPENDED;
 
357
        }
 
358
ENDtryResume
 
359
 
 
360
BEGINdoAction
 
361
CODESTARTdoAction
 
362
        DBGPRINTF("omuxsock: action to to write to %s\n", pData->pFile->name);
 
363
        iRet = fileWrite(pData->pFile, ppString[0]);
 
364
ENDdoAction
 
365
 
 
366
 
 
367
BEGINparseSelectorAct
 
368
        file_t *pFile;
 
369
        int r;
 
370
        uchar *keybuf;
 
371
CODESTARTparseSelectorAct
 
372
 
 
373
        /* first check if this config line is actually for us */
 
374
        if(strncmp((char*) p, ":omhdfs:", sizeof(":omhdfs:") - 1)) {
 
375
                ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED);
 
376
        }
 
377
 
 
378
        /* ok, if we reach this point, we have something for us */
 
379
        p += sizeof(":omhdfs:") - 1; /* eat indicator sequence  (-1 because of '\0'!) */
 
380
        CHKiRet(createInstance(&pData));
 
381
        CODE_STD_STRING_REQUESTparseSelectorAct(1)
 
382
        CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, 0,
 
383
                                       (dfltTplName == NULL) ? (uchar*)"RSYSLOG_FileFormat" : dfltTplName));
 
384
 
 
385
        if(fileName == NULL) {
 
386
                errmsg.LogError(0, RS_RET_ERR_HDFS_OPEN, "omhdfs: no file name specified, can not continue");
 
387
                ABORT_FINALIZE(RS_RET_FILE_NOT_SPECIFIED);
 
388
        }
 
389
 
 
390
        pFile = hashtable_search(files, fileName);
 
391
        if(pFile == NULL) {
 
392
                /* we need a new file object, this one not seen before */
 
393
                CHKiRet(fileObjConstruct(&pFile));
 
394
                CHKmalloc(pFile->name = fileName);
 
395
                CHKmalloc(keybuf = ustrdup(fileName));
 
396
                fileName = NULL; /* re-set, data passed to file object */
 
397
                CHKmalloc(pFile->hdfsHost = strdup((hdfsHost == NULL) ? "default" : (char*) hdfsHost));
 
398
                pFile->hdfsPort = hdfsPort;
 
399
                fileOpen(pFile);
 
400
                if(pFile->fh == NULL){
 
401
                        errmsg.LogError(0, RS_RET_ERR_HDFS_OPEN, "omhdfs: failed to open %s - "
 
402
                                        "retrying later", pFile->name);
 
403
                        iRet = RS_RET_SUSPENDED;
 
404
                }
 
405
                r = hashtable_insert(files, keybuf, pFile);
 
406
                if(r == 0)
 
407
                        ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
 
408
        }
 
409
        fileObjAddUser(pFile);
 
410
        pData->pFile = pFile;
 
411
 
 
412
CODE_STD_FINALIZERparseSelectorAct
 
413
ENDparseSelectorAct
 
414
 
 
415
 
 
416
BEGINdoHUP
 
417
    file_t *pFile;
 
418
    struct hashtable_itr *itr;
 
419
CODESTARTdoHUP
 
420
        DBGPRINTF("omhdfs: HUP received (file count %d)\n", hashtable_count(files));
 
421
        /* Iterator constructor only returns a valid iterator if
 
422
        * the hashtable is not empty */
 
423
        itr = hashtable_iterator(files);
 
424
        if(hashtable_count(files) > 0)
 
425
        {
 
426
                do {
 
427
                        pFile = (file_t *) hashtable_iterator_value(itr);
 
428
                        fileClose(pFile);
 
429
                        DBGPRINTF("omhdfs: HUP, closing file %s\n", pFile->name);
 
430
                } while (hashtable_iterator_advance(itr));
 
431
        }
 
432
ENDdoHUP
 
433
 
 
434
 
 
435
/* Reset config variables for this module to default values.
 
436
 * rgerhards, 2007-07-17
 
437
 */
 
438
static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal)
 
439
{
 
440
        hdfsHost = NULL;
 
441
        hdfsPort = 0;
 
442
        return RS_RET_OK;
 
443
}
 
444
 
 
445
 
 
446
BEGINmodExit
 
447
CODESTARTmodExit
 
448
        objRelease(errmsg, CORE_COMPONENT);
 
449
        if(files != NULL)
 
450
                hashtable_destroy(files, 1); /* 1 => free all values automatically */
 
451
ENDmodExit
 
452
 
 
453
 
 
454
BEGINqueryEtryPt
 
455
CODESTARTqueryEtryPt
 
456
CODEqueryEtryPt_STD_OMOD_QUERIES
 
457
CODEqueryEtryPt_doHUP
 
458
ENDqueryEtryPt
 
459
 
 
460
 
 
461
BEGINmodInit()
 
462
CODESTARTmodInit
 
463
        *ipIFVersProvided = CURR_MOD_IF_VERSION;
 
464
CODEmodInit_QueryRegCFSLineHdlr
 
465
        CHKiRet(objUse(errmsg, CORE_COMPONENT));
 
466
        CHKmalloc(files = create_hashtable(20, hash_from_string, key_equals_string,
 
467
                                           fileObjDestruct4Hashtable));
 
468
 
 
469
        CHKiRet(regCfSysLineHdlr((uchar *)"omhdfsfilename", 0, eCmdHdlrGetWord, NULL, &fileName, NULL));
 
470
        CHKiRet(regCfSysLineHdlr((uchar *)"omhdfshost", 0, eCmdHdlrGetWord, NULL, &hdfsHost, NULL));
 
471
        CHKiRet(regCfSysLineHdlr((uchar *)"omhdfsport", 0, eCmdHdlrInt, NULL, &hdfsPort, NULL));
 
472
        CHKiRet(regCfSysLineHdlr((uchar *)"omhdfsdefaulttemplate", 0, eCmdHdlrGetWord, NULL, &dfltTplName, NULL));
 
473
        CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID));
 
474
CODEmodInit_QueryRegCFSLineHdlr
 
475
ENDmodInit