~openerp-commiter/openobject-addons/stable-sja-branch

« back to all changes in this revision

Viewing changes to etl/specs/prototype2/proto_new.py

  • Committer: sja-axelor
  • Date: 2009-10-13 09:52:57 UTC
  • Revision ID: suniljagyasi@gmail.com-20091013095257-8u26ww0r20z9y6ey
add

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
 
 
2
import csv
 
3
import sys
 
4
import time
 
5
 
 
6
class component(object):
 
7
    is_end = False
 
8
    def __init__(self,*args, **argv):
 
9
        self.trans_in = []
 
10
        self.trans_out = []
 
11
        self.is_output = False
 
12
        self.data = {}
 
13
        self.generator = None
 
14
 
 
15
    def generator_get(self, transition):
 
16
        if self.generator:
 
17
            return self.generator
 
18
        self.generator = self.process()
 
19
        return self.generator
 
20
 
 
21
    def channel_get(self, trans=None):
 
22
        self.data.setdefault(trans, [])        
 
23
        gen = self.generator_get(trans) or []
 
24
        while True:
 
25
            if self.data[trans]:
 
26
                yield self.data[trans].pop(0)
 
27
                continue
 
28
            elif self.data[trans] is None:
 
29
                raise StopIteration            
 
30
            data, chan = gen.next()
 
31
            if data is None:
 
32
                raise StopIteration
 
33
            for t,t2 in self.trans_out:
 
34
                if (t == chan) or (not t) or (not chan):
 
35
                    self.data.setdefault(t2, [])
 
36
                    self.data[t2].append(data)
 
37
 
 
38
    def process(self):
 
39
        pass
 
40
 
 
41
    def input_get(self):
 
42
        result = {}
 
43
        for channel,trans in self.trans_in:
 
44
            result.setdefault(channel, [])
 
45
            result[channel].append(trans.source.channel_get(trans))
 
46
        return result
 
47
 
 
48
class csv_in(component):
 
49
    def __init__(self, filename, *args, **argv):
 
50
        super(csv_in, self).__init__(*args, **argv)
 
51
        self.filename = filename
 
52
 
 
53
    def process(self):
 
54
        fp = csv.DictReader(file(self.filename))
 
55
        for row in fp:
 
56
            yield row, 'main'
 
57
 
 
58
class csv_out(component):
 
59
    def __init__(self, filename, *args, **argv):
 
60
        super(csv_out, self).__init__(*args, **argv)
 
61
        self.filename=filename
 
62
 
 
63
    def process(self):
 
64
        fp2=None
 
65
        datas = []
 
66
        for channel,trans in self.input_get().items():
 
67
            for iterator in trans:
 
68
                for d in iterator:                    
 
69
                    if not fp2:
 
70
                        fp2 = file(self.filename, 'wb+')
 
71
                        fieldnames = d.keys()
 
72
                        fp = csv.DictWriter(fp2, fieldnames)
 
73
                        fp.writerow(dict(map(lambda x: (x,x), fieldnames)))
 
74
                    fp.writerow(d)
 
75
                    yield d, 'main'
 
76
 
 
77
class control_count(component):
 
78
    def process(self):
 
79
        datas = {}
 
80
        for channel,trans in self.input_get().items():
 
81
            for iterator in trans:
 
82
                for d in iterator:
 
83
                    datas.setdefault(channel, 0)
 
84
                    datas[channel] += 1
 
85
 
 
86
        for d in datas:
 
87
            yield {'channel': d, 'count': datas[d]}, 'main'
 
88
 
 
89
class sort(component):
 
90
    def __init__(self, fieldname, *args, **argv):
 
91
        super(sort, self).__init__(*args, **argv)
 
92
        self.fieldname = fieldname
 
93
 
 
94
    # Read all input channels, sort and write to 'main' channel
 
95
    def process(self):
 
96
        datas = []
 
97
        for channel,trans in self.input_get().items():
 
98
            for iterator in trans:
 
99
                for d in iterator:
 
100
                    datas.append(d)
 
101
 
 
102
        datas.sort(lambda x,y: cmp(x[self.fieldname],y[self.fieldname]))
 
103
        for d in datas:
 
104
            yield d, 'main'
 
105
 
 
106
class logger_bloc(component):
 
107
    def __init__(self, name, output=sys.stdout, *args, **argv):
 
108
        self.name = name
 
109
        self.output = output
 
110
        self.is_end = 'main'
 
111
        super(logger_bloc, self).__init__(*args, **argv)
 
112
 
 
113
    def process(self):
 
114
        datas=[]
 
115
        for channel,trans in self.input_get().items():
 
116
            for iterator in trans:
 
117
                for d in iterator:
 
118
                    datas.append(d)
 
119
        for d in datas:
 
120
            self.output.write('\tBloc Log '+self.name+str(d)+'\n')
 
121
            yield d, 'main'
 
122
 
 
123
 
 
124
class sleep(component):
 
125
    def __init__(self, delay=1, *args, **argv):
 
126
        self.delay = delay
 
127
        super(sleep, self).__init__(*args, **argv)
 
128
 
 
129
    def process(self):
 
130
        for channel,trans in self.input_get().items():
 
131
            for iterator in trans:
 
132
                for d in iterator:
 
133
                    time.sleep(self.delay)
 
134
                    yield d, 'main'
 
135
 
 
136
class logger(component):
 
137
    def __init__(self, name, output=sys.stdout, *args, **argv):
 
138
        self.name = name
 
139
        self.output = output
 
140
        self.is_end = 'main'
 
141
        super(logger, self).__init__(*args, **argv)
 
142
 
 
143
    def process(self):
 
144
        for channel,trans in self.input_get().items():
 
145
            for iterator in trans:
 
146
                for d in iterator:                    
 
147
                    self.output.write('\tLog '+self.name+str(d)+'\n')
 
148
                    yield d, 'main'
 
149
 
 
150
class diff(component):
 
151
    def __init__(self, keys, *args, **argv):
 
152
        self.keys = keys
 
153
        self.row = {}
 
154
        self.diff = []
 
155
        self.same = []
 
156
        super(diff, self).__init__(*args, **argv)
 
157
 
 
158
    # Return the key of a row
 
159
    def key_get(self, row):
 
160
        result = []
 
161
        for k in self.keys:
 
162
            result.append(row[k])
 
163
        return tuple(result)
 
164
 
 
165
    def process(self):  
 
166
        self.row = {}      
 
167
        for channel,transition in self.input_get().items():            
 
168
            if channel not in self.row:
 
169
                self.row[channel] = {}
 
170
            other = None            
 
171
            for key in self.row.keys():
 
172
                if key<>channel:
 
173
                    other = key
 
174
                    break
 
175
            for iterator in transition:
 
176
                for r in iterator: 
 
177
                    key = self.key_get(r)
 
178
                    if other and (key in self.row[other]):
 
179
                        if self.row[other][key] == r:
 
180
                            yield r, 'same'                            
 
181
                        else:
 
182
                            yield r, 'update' 
 
183
                        del self.row[other][key]
 
184
                    else:
 
185
                        self.row[channel][key] = r         
 
186
        todo = ['add','remove']
 
187
        for k in self.row:     
 
188
            channel= todo.pop()                   
 
189
            for v in self.row[k].values():
 
190
                yield v,channel               
 
191
 
 
192
 
 
193
 
 
194
class transition(object):
 
195
    def __init__(self, source, destination,type='data_transition', status='open', channel_source='main', channel_destination='main'):
 
196
        self.type=type
 
197
        self.source = source
 
198
        self.destination = destination
 
199
        self.channel_source = channel_source
 
200
        self.channel_destination = channel_destination
 
201
        self.destination.trans_in.append((channel_destination,self)) #:source.channel_get(self)})
 
202
        self.source.trans_out.append((channel_source,self))
 
203
 
 
204
class job(object):
 
205
    def __init__(self,outputs=[]):
 
206
        self.outputs=outputs
 
207
 
 
208
    def run(self):
 
209
        for c in self.outputs:
 
210
            for a in c.channel_get():
 
211
                pass
 
212
 
 
213
csv_in1= csv_in('partner.csv')
 
214
csv_in2= csv_in('partner1.csv')
 
215
csv_out1= csv_out('partner2.csv')
 
216
sort1=sort('name')
 
217
log1=logger(name='Read Partner File')
 
218
log2=logger(name='After Sort')
 
219
sleep1=sleep(1)
 
220
 
 
221
tran=transition(csv_in1,sort1)
 
222
tran1=transition(csv_in2,sort1)
 
223
tran4=transition(sort1,sleep1)
 
224
tran4=transition(sleep1,log2)
 
225
tran5=transition(sort1,csv_out1)
 
226
 
 
227
 
 
228
#job1=job([csv_out1,log2])
 
229
#job1.run()
 
230
 
 
231
 
 
232
in1 = csv_in('partner.csv')
 
233
in2 = csv_in('partner2.csv')
 
234
in3 = csv_in('partner3.csv')
 
235
in4 = csv_in('add.csv')
 
236
diff1 = diff(['id'])
 
237
 
 
238
log_1 = logger_bloc(name="Original Data")
 
239
log_2 = logger_bloc(name="Modified Data")
 
240
 
 
241
log1 = logger(name="Log Same")
 
242
log2 = logger(name="Log Add")
 
243
log3 = logger(name="Log Remove")
 
244
log4 = logger(name="Log Update")
 
245
 
 
246
 
 
247
csv_out1 = csv_out('add.csv')
 
248
 
 
249
#transition(in1, log_1)
 
250
#transition(in2, log_2)
 
251
#
 
252
#transition(in1, diff1, channel_destination='original')
 
253
#transition(in2, diff1, channel_destination='modified')
 
254
#
 
255
#transition(diff1, log1, channel_source="same")
 
256
#transition(diff1, log3, channel_source="remove")
 
257
#transition(diff1, log2, channel_source="add")
 
258
#transition(diff1, csv_out1, channel_source="add")
 
259
#transition(diff1, log4, channel_source="update")
 
260
#
 
261
#job = job([log1,log2,log3,log4,csv_out1])
 
262
#job.run()
 
263
 
 
264
transition(in3, log1)
 
265
transition(in4, log1)
 
266
 
 
267
c1 = control_count()
 
268
transition(in3, c1)
 
269
transition(in4, c1)
 
270
transition(log1, c1, channel_destination='end')
 
271
 
 
272
log10=logger(name='Count')
 
273
transition(c1, log10)
 
274
 
 
275
job = job([log10])
 
276
job.run()
 
277
 
 
278
 
 
279
 
 
280
#class a(object):
 
281
#    def test(self, hello):
 
282
#        print hello
 
283
#b=a()
 
284
#
 
285
#import pickle
 
286
#pickle.dump(b,file('result.pickle','wb+'))
 
287
 
 
288
 
 
289