1
#!/usr/local/bin/python
10
# flow-rpt2rrd - convert flow-report output to rrd format
14
# work with time-series flow-report output
15
# relax 5 minute sample requirement.
20
def load_keyfile(fname):
23
line = f.readline().strip()
29
line = f.readline().strip()
31
return string.join(keys, ',')
35
# load a symbol table from a file in value symbol format, provide
36
# access methods findbyname and findbyval
43
def __init__(self,field):
46
__symbol_lookup = { 'ip-source-port' : 'tcp-port.sym',
47
'ip-destination-port' : 'tcp-port.sym',
48
'ip-protocol' : 'ip-prot.sym',
49
'source-as' : 'asn.sym',
50
'destination-as' : 'asn.sym',
51
'source-tag' : 'tag.sym',
52
'destination-tag' : 'tag.sym',
53
'ip-address-type' : 'ip-type.sym',
55
fname = "/usr/local/netflow/var/sym/%s" % __symbol_lookup[field]
57
line = f.readline().strip()
62
line = f.readline().strip()
66
# access by name, return value. If the name does not exist return the name.
68
def findbyname(self, name):
69
return self.sv.get(name,name)
72
# access by value, return name. If the value does not exist return the value.
74
def findbyval(self, val):
75
return self.vs.get(val,val)
81
# Read in output of flow-report, make suitable for rrd
83
# pickfields - pick flows,octets,packets for inclusion into new rrd
84
# pickkeys - pick keys for inclusion into new rrds.
85
# mapsym() - replace key values with symbols
86
# setrrd() - set rrd params
87
# convert(stream) - convert to rrd format
98
self.use_key_names = {}
99
self.use_key_names_special = {}
100
self.use_key_names_total = 0
101
self.use_fields = {'flows' : 1, 'octets' : 1, 'packets' : 1}
108
self.rrd_postfix = ''
109
self.field_names = {}
110
self.field_names2 = {}
116
self.records_processed = 0
118
def set_use_fields(self,f):
120
for i in string.split(f, ','):
121
self.use_fields[i] = 1
123
def set_use_key_names(self,f):
124
self.use_key_names = {}
125
self.use_key_names_special = {}
126
for i in string.split(f, ','):
127
if i[:6] == 'total_':
128
self.use_key_names_special[i] = 1
130
self.use_key_names[i] = 1
131
self.use_key_names_total += 1
133
def set_mapsym(self):
136
def set_debug(self, debug):
139
def set_verbose(self, verbose):
140
self.verbose = verbose
142
def setrrd(self, storage, path, postfix):
143
(self.rrd_5min, self.rrd_30min, self.rrd_2hr, self.rrd_1day) = \
144
string.split(storage,':')
146
self.rrd_postfix = postfix
148
def update_rrd(self, key, vals, use_fields_index):
150
# / in the key maps to - for files
151
key = key.replace('/','-')
153
# open an rrd, it it doesn't exist create it.
154
rrdFile = "%s/%s%s.rrd" % (self.rrd_path, key, self.rrd_postfix)
157
if not os.access(rrdFile, os.F_OK):
159
print "Creating RRD", rrdFile
162
t = str(int(self.start_time) - 300)
163
rrdParams.append('--start')
165
for i in use_fields_index.keys():
166
rrdParams.append("DS:%s:ABSOLUTE:600:U:U" % use_fields_index[i])
168
rrdParams.append('RRA:AVERAGE:0.5:1:%s' % self.rrd_5min)
169
rrdParams.append('RRA:MAX:0.5:1:%s' % self.rrd_5min)
171
rrdParams.append('RRA:AVERAGE:0.5:6:%s' % self.rrd_30min)
172
rrdParams.append('RRA:MAX:0.5:6:%s' % self.rrd_30min)
174
rrdParams.append('RRA:AVERAGE:0.5:24:%s' % self.rrd_2hr)
175
rrdParams.append('RRA:MAX:0.5:24:%s' % self.rrd_2hr)
177
rrdParams.append('RRA:AVERAGE:0.5:288:%s' % self.rrd_1day)
178
rrdParams.append('RRA:MAX:0.5:288:%s' % self.rrd_1day)
180
rrdtool.create(rrdFile, *rrdParams)
182
print >>sys.stderr, string.join(rrdParams,' ')
186
update = self.start_time
187
for i in use_fields_index.keys():
188
update = "%s:%s" % (update,vals[i])
190
print >>sys.stderr, "update", update
193
print "Updating RRD", rrdFile
195
rrdtool.update(rrdFile,update)
200
def convert(self, f):
203
line = f.readline().strip()
207
# report data starts after recn comment
208
if (not self.in_data) :
210
if line[:13] == '# first-flow:':
211
self.start_time = (string.split(line[14:]))[0]
213
# handle the totals record differently
214
if line[:53] == '# rec1: records,ignores,flows,octets,packets,duration':
215
tmp = string.split(line[8:], ',')
216
line = f.readline().strip()
217
tmp_use_fields_index = {}
218
tmp_splt = string.split(line, ',')
222
if self.use_key_names_special.get("total_%s" % i,0):
223
tmp_use_fields_index[x] = i
227
self.update_rrd('totals', tmp_splt, tmp_use_fields_index)
228
del tmp_splt, tmp_use_fields_index, i, x, ds
231
if line[:6] == '# recn':
234
# foreach element in field names
235
for i in string.split(line[8:],','):
237
# remove key designators
240
self.field_keys[self.field_total] = 1
242
self.field_vals[self.field_total] = 1
244
# store the field names
245
self.field_names[self.field_total] = i
246
self.field_names2[i] = self.field_total
248
self.field_total += 1
250
# start time must be set by now
251
if (self.start_time == 0):
252
raise ValueError, "Start time not found, make sure flow-report is including the header"
256
for i in self.field_keys.keys():
257
self.sym[i] = ftsym(self.field_names[i])
259
# convert use_fields to use_fields_index for easier access
260
self.use_fields_index = {}
261
for i in self.use_fields.keys():
262
if self.use_fields[i] and self.field_names2.get(i,'x') != 'x':
263
self.use_fields_index[self.field_names2[i]] = i
267
# if in the data area and not a comment, store it
268
if self.in_data and line [:1] != '#':
270
splt = string.split(line, ',')
272
# combine the key fields to form one key
274
for i in self.field_keys.keys():
275
# try a symbol table lookup
277
t = self.sym[i].findbyval(splt[i])
282
# done if all entries in key_names list have been stored.
283
if self.use_key_names_total:
284
if self.records_processed == self.use_key_names_total:
287
# if set, only allow specified keys
288
if self.use_key_names.get(k[1:],0) == 0:
289
line = f.readline().strip()
292
# mark this key as processed
293
self.use_key_names[k[1:]] |= 2
295
self.records_processed += 1
297
self.update_rrd(k[1:], splt, self.use_fields_index)
300
line = f.readline().strip()
302
# keys which were not available in the report also need to be
303
# updated with 0 values.
304
for i in xrange(len(splt)):
306
for x in self.use_key_names.keys():
307
if not (self.use_key_names[x] & 2):
308
self.update_rrd(x, splt, self.use_fields_index)
314
(opts,rags) = getopt.getopt(sys.argv[1:], "dhk:K:f:np:P:r:v")
317
opt_rrd_storage = "600:600:600:732"
319
opt_fields = 'flows,octets,packets'
334
opt_keys = load_keyfile(v)
348
print "Usage: flow-rpt2rrd [-nv] [-k keys] [-K keyfile] [-f fields]"
349
print " [-p rrd_path] [-P fname_postfix]"
350
print " [-r rrd_storage 5_min:30_min:2_hr:1_day ]"
354
print >>sys.stderr, "Keys must be defined with -k or -K."
360
ftrrd.setrrd(opt_rrd_storage, opt_rrd_path, opt_rrd_postfix)
361
ftrrd.set_use_key_names(opt_keys)
362
ftrrd.set_use_fields(opt_fields)
363
ftrrd.set_debug(opt_debug)
364
ftrrd.set_verbose(opt_verbose)
365
ftrrd.convert(sys.stdin)