~openerp-commiter/openobject-addons/stable-sja-branch

« back to all changes in this revision

Viewing changes to etl/specs/prototype/prototype.py

  • Committer: sja-axelor
  • Date: 2009-10-13 09:52:57 UTC
  • Revision ID: suniljagyasi@gmail.com-20091013095257-8u26ww0r20z9y6ey
add

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#csv_in ---> logger
 
2
import csv
 
3
 
 
4
class component(object):
 
5
    def __init__(self,*args, **argv):
 
6
        is_start=True
 
7
        self.data=[]
 
8
        self.job=argv.get('job',False)
 
9
        self.trans_out = []
 
10
        self.trans_in = []
 
11
 
 
12
    def event(self,name):
 
13
        return self.output(event=name)
 
14
    def start(self):
 
15
        self.event('component_start')
 
16
        return True
 
17
    def stop(self):
 
18
        self.event('component_end')
 
19
        return True
 
20
    def run(self,input_data=[]):
 
21
        self.event('component_run')
 
22
        data=self.output(input_data)
 
23
        return data
 
24
 
 
25
    def input(self, rows, transition=None):
 
26
        self.event('component_inputflow')
 
27
        return self.output(rows)
 
28
 
 
29
    def output(self, rows=None, channel=None,event=None):
 
30
        self.event('component_outputflow')
 
31
        for trans in self.trans_out:
 
32
            if (not channel) or (trans.channel_source==channel) or (not trans.channel_source):
 
33
                if trans.type=='data_transition' or (trans.type=='trigger_transition' and event==trans.listen_event):
 
34
                    yield rows, trans.destination
 
35
                else:
 
36
                    yield rows,None
 
37
 
 
38
class csv_in(component):
 
39
    def __init__(self, filename, *args, **argv):
 
40
        super(csv_in, self).__init__(*args, **argv)
 
41
        self.filename = filename
 
42
 
 
43
    def run(self,data=[]):        
 
44
        fp = csv.DictReader(file(self.filename))
 
45
        data=[]
 
46
        for row in fp:
 
47
            data.append(row)
 
48
        return super(csv_in, self).run(data)
 
49
 
 
50
class csv_out(component):
 
51
    def __init__(self, filename, *args, **argv):
 
52
        super(csv_out, self).__init__(*args, **argv)
 
53
        self.filename=filename
 
54
        self.fp=None
 
55
 
 
56
    def run(self,rows=[]):
 
57
        self.fp = file(self.filename, 'wb+')
 
58
        return self.input(rows)
 
59
 
 
60
    def input(self,rows=[]):
 
61
        fieldnames = rows[0].keys()
 
62
        fp = csv.DictWriter(self.fp, fieldnames)
 
63
        fp.writerow(dict(map(lambda x: (x,x), fieldnames)))
 
64
        fp.writerows(rows)        
 
65
        return super(csv_out, self).input(rows)
 
66
 
 
67
class sort(component):
 
68
    def __init__(self, fieldname, *args, **argv):
 
69
        self.fieldname = fieldname
 
70
        super(sort, self).__init__(*args, **argv)    
 
71
 
 
72
    def run(self,rows=[], transition=None):
 
73
        self.data=rows
 
74
        self.data.sort(lambda x,y: cmp(x[self.fieldname],y[self.fieldname]))
 
75
        return super(sort, self).run(self.data)     
 
76
 
 
77
    
 
78
 
 
79
class logger(component):
 
80
    def __init__(self, name, *args, **argv):
 
81
        self.name = name
 
82
        super(logger, self).__init__(*args, **argv) 
 
83
    def run(self,data=[]): 
 
84
        res=[]    
 
85
        print ' Logger : ',self.name
 
86
        for row in data:
 
87
            print row
 
88
            res.append(row)
 
89
        return super(logger, self).run(data)
 
90
 
 
91
class transition(object):
 
92
    def __init__(self, source, destination,type='data_transition', status='open', channel_source=None, channel_destination=None):
 
93
        self.type=type
 
94
        self.source = source
 
95
        self.source.trans_out.append(self)
 
96
        self.destination = destination
 
97
        self.destination.trans_in.append(self)
 
98
        self.status = 'open'
 
99
        self.channel_source = channel_source
 
100
        self.channel_destination = channel_destination
 
101
 
 
102
class job(object):
 
103
    def __init__(self,start_component,components=[]):
 
104
        self.components=components
 
105
        self.start_component=start_component
 
106
        for component in components:
 
107
            component.job=self
 
108
 
 
109
    def run(self):
 
110
        data=None
 
111
        start_component=self.start_component        
 
112
        def _run(data,component):            
 
113
            if not component:
 
114
                return            
 
115
            res=component.start()        
 
116
            if not res:
 
117
                raise Exception('not started component')
 
118
            try:
 
119
                res_list=component.run(data)                
 
120
                for out_data,out_component in res_list:                    
 
121
                    _run(out_data,out_component)
 
122
            except Exception,e:
 
123
                raise e
 
124
            finally:
 
125
                component.stop() 
 
126
        _run(data,start_component)    
 
127
            
 
128
 
 
129
 
 
130
csv_in1= csv_in('partner.csv')
 
131
csv_out1= csv_out('partner2.csv')
 
132
sort1=sort('name')
 
133
log1=logger(name='Read Partner File')
 
134
log2=logger(name='After Sort')
 
135
 
 
136
tran1=transition(csv_in1,log1)
 
137
tran2=transition(csv_in1,sort1)
 
138
tran3=transition(sort1,csv_out1)
 
139
tran3=transition(sort1,log2)
 
140
 
 
141
job1=job(csv_in1,[csv_in1,log1])
 
142
 
 
143
job1.run()
 
144
 
 
145
 
 
146
 
 
147
 
 
148
 
 
149
 
 
150