4
class component(object):
5
def __init__(self,*args, **argv):
8
self.job=argv.get('job',False)
13
return self.output(event=name)
15
self.event('component_start')
18
self.event('component_end')
20
def run(self,input_data=[]):
21
self.event('component_run')
22
data=self.output(input_data)
25
def input(self, rows, transition=None):
26
self.event('component_inputflow')
27
return self.output(rows)
29
def output(self, rows=None, channel=None,event=None):
30
self.event('component_outputflow')
31
for trans in self.trans_out:
32
if (not channel) or (trans.channel_source==channel) or (not trans.channel_source):
33
if trans.type=='data_transition' or (trans.type=='trigger_transition' and event==trans.listen_event):
34
yield rows, trans.destination
38
class csv_in(component):
39
def __init__(self, filename, *args, **argv):
40
super(csv_in, self).__init__(*args, **argv)
41
self.filename = filename
43
def run(self,data=[]):
44
fp = csv.DictReader(file(self.filename))
48
return super(csv_in, self).run(data)
50
class csv_out(component):
51
def __init__(self, filename, *args, **argv):
52
super(csv_out, self).__init__(*args, **argv)
53
self.filename=filename
56
def run(self,rows=[]):
57
self.fp = file(self.filename, 'wb+')
58
return self.input(rows)
60
def input(self,rows=[]):
61
fieldnames = rows[0].keys()
62
fp = csv.DictWriter(self.fp, fieldnames)
63
fp.writerow(dict(map(lambda x: (x,x), fieldnames)))
65
return super(csv_out, self).input(rows)
67
class sort(component):
68
def __init__(self, fieldname, *args, **argv):
69
self.fieldname = fieldname
70
super(sort, self).__init__(*args, **argv)
72
def run(self,rows=[], transition=None):
74
self.data.sort(lambda x,y: cmp(x[self.fieldname],y[self.fieldname]))
75
return super(sort, self).run(self.data)
79
class logger(component):
80
def __init__(self, name, *args, **argv):
82
super(logger, self).__init__(*args, **argv)
83
def run(self,data=[]):
85
print ' Logger : ',self.name
89
return super(logger, self).run(data)
91
class transition(object):
92
def __init__(self, source, destination,type='data_transition', status='open', channel_source=None, channel_destination=None):
95
self.source.trans_out.append(self)
96
self.destination = destination
97
self.destination.trans_in.append(self)
99
self.channel_source = channel_source
100
self.channel_destination = channel_destination
103
def __init__(self,start_component,components=[]):
104
self.components=components
105
self.start_component=start_component
106
for component in components:
111
start_component=self.start_component
112
def _run(data,component):
115
res=component.start()
117
raise Exception('not started component')
119
res_list=component.run(data)
120
for out_data,out_component in res_list:
121
_run(out_data,out_component)
126
_run(data,start_component)
130
csv_in1= csv_in('partner.csv')
131
csv_out1= csv_out('partner2.csv')
133
log1=logger(name='Read Partner File')
134
log2=logger(name='After Sort')
136
tran1=transition(csv_in1,log1)
137
tran2=transition(csv_in1,sort1)
138
tran3=transition(sort1,csv_out1)
139
tran3=transition(sort1,log2)
141
job1=job(csv_in1,[csv_in1,log1])