6
class component(object):
8
def __init__(self,*args, **argv):
11
self.is_output = False
15
def generator_get(self, transition):
18
self.generator = self.process()
21
def channel_get(self, trans=None):
22
self.data.setdefault(trans, [])
23
gen = self.generator_get(trans) or []
26
yield self.data[trans].pop(0)
28
elif self.data[trans] is None:
30
data, chan = gen.next()
33
for t,t2 in self.trans_out:
34
if (t == chan) or (not t) or (not chan):
35
self.data.setdefault(t2, [])
36
self.data[t2].append(data)
43
for channel,trans in self.trans_in:
44
result.setdefault(channel, [])
45
result[channel].append(trans.source.channel_get(trans))
48
class csv_in(component):
49
def __init__(self, filename, *args, **argv):
50
super(csv_in, self).__init__(*args, **argv)
51
self.filename = filename
54
fp = csv.DictReader(file(self.filename))
58
class csv_out(component):
59
def __init__(self, filename, *args, **argv):
60
super(csv_out, self).__init__(*args, **argv)
61
self.filename=filename
66
for channel,trans in self.input_get().items():
67
for iterator in trans:
70
fp2 = file(self.filename, 'wb+')
72
fp = csv.DictWriter(fp2, fieldnames)
73
fp.writerow(dict(map(lambda x: (x,x), fieldnames)))
77
class control_count(component):
80
for channel,trans in self.input_get().items():
81
for iterator in trans:
83
datas.setdefault(channel, 0)
87
yield {'channel': d, 'count': datas[d]}, 'main'
89
class sort(component):
90
def __init__(self, fieldname, *args, **argv):
91
super(sort, self).__init__(*args, **argv)
92
self.fieldname = fieldname
94
# Read all input channels, sort and write to 'main' channel
97
for channel,trans in self.input_get().items():
98
for iterator in trans:
102
datas.sort(lambda x,y: cmp(x[self.fieldname],y[self.fieldname]))
106
class logger_bloc(component):
107
def __init__(self, name, output=sys.stdout, *args, **argv):
111
super(logger_bloc, self).__init__(*args, **argv)
115
for channel,trans in self.input_get().items():
116
for iterator in trans:
120
self.output.write('\tBloc Log '+self.name+str(d)+'\n')
124
class sleep(component):
125
def __init__(self, delay=1, *args, **argv):
127
super(sleep, self).__init__(*args, **argv)
130
for channel,trans in self.input_get().items():
131
for iterator in trans:
133
time.sleep(self.delay)
136
class logger(component):
137
def __init__(self, name, output=sys.stdout, *args, **argv):
141
super(logger, self).__init__(*args, **argv)
144
for channel,trans in self.input_get().items():
145
for iterator in trans:
147
self.output.write('\tLog '+self.name+str(d)+'\n')
150
class diff(component):
151
def __init__(self, keys, *args, **argv):
156
super(diff, self).__init__(*args, **argv)
158
# Return the key of a row
159
def key_get(self, row):
162
result.append(row[k])
167
for channel,transition in self.input_get().items():
168
if channel not in self.row:
169
self.row[channel] = {}
171
for key in self.row.keys():
175
for iterator in transition:
177
key = self.key_get(r)
178
if other and (key in self.row[other]):
179
if self.row[other][key] == r:
183
del self.row[other][key]
185
self.row[channel][key] = r
186
todo = ['add','remove']
189
for v in self.row[k].values():
194
class transition(object):
195
def __init__(self, source, destination,type='data_transition', status='open', channel_source='main', channel_destination='main'):
198
self.destination = destination
199
self.channel_source = channel_source
200
self.channel_destination = channel_destination
201
self.destination.trans_in.append((channel_destination,self)) #:source.channel_get(self)})
202
self.source.trans_out.append((channel_source,self))
205
def __init__(self,outputs=[]):
209
for c in self.outputs:
210
for a in c.channel_get():
213
csv_in1= csv_in('partner.csv')
214
csv_in2= csv_in('partner1.csv')
215
csv_out1= csv_out('partner2.csv')
217
log1=logger(name='Read Partner File')
218
log2=logger(name='After Sort')
221
tran=transition(csv_in1,sort1)
222
tran1=transition(csv_in2,sort1)
223
tran4=transition(sort1,sleep1)
224
tran4=transition(sleep1,log2)
225
tran5=transition(sort1,csv_out1)
228
#job1=job([csv_out1,log2])
232
in1 = csv_in('partner.csv')
233
in2 = csv_in('partner2.csv')
234
in3 = csv_in('partner3.csv')
235
in4 = csv_in('add.csv')
238
log_1 = logger_bloc(name="Original Data")
239
log_2 = logger_bloc(name="Modified Data")
241
log1 = logger(name="Log Same")
242
log2 = logger(name="Log Add")
243
log3 = logger(name="Log Remove")
244
log4 = logger(name="Log Update")
247
csv_out1 = csv_out('add.csv')
249
#transition(in1, log_1)
250
#transition(in2, log_2)
252
#transition(in1, diff1, channel_destination='original')
253
#transition(in2, diff1, channel_destination='modified')
255
#transition(diff1, log1, channel_source="same")
256
#transition(diff1, log3, channel_source="remove")
257
#transition(diff1, log2, channel_source="add")
258
#transition(diff1, csv_out1, channel_source="add")
259
#transition(diff1, log4, channel_source="update")
261
#job = job([log1,log2,log3,log4,csv_out1])
264
transition(in3, log1)
265
transition(in4, log1)
270
transition(log1, c1, channel_destination='end')
272
log10=logger(name='Count')
273
transition(c1, log10)
281
# def test(self, hello):
286
#pickle.dump(b,file('result.pickle','wb+'))