~openerp-commiter/openobject-addons/extra-6.0

« back to all changes in this revision

Viewing changes to etl/specs/prototype/etl/etl.py

  • Committer: Fabien Pinckaers
  • Date: 2009-01-12 07:30:23 UTC
  • Revision ID: fp@tinyerp.com-20090112073023-t6pon9a1d16bycby
Adding_specs_etl

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#!/usr/bin/python
 
2
 
 
3
#
 
4
# Proof of concept ETL computation
 
5
#
 
6
 
 
7
class node(object):
 
8
    def __init__(self, *args, **argv):
 
9
        self.meta = None
 
10
        self.has_input = True
 
11
        self.has_output = True
 
12
        self.is_start = argv.get('is_start', False)
 
13
        self.name = argv.get('name', '')
 
14
        self.trans_out = []
 
15
        self.trans_in = []
 
16
        self.stopped = []
 
17
 
 
18
    # When a flow starts, it should pass elements like meta to next flows
 
19
    def start(self, transition=None):
 
20
        for trans in self.trans_out:
 
21
            trans.destination.start(trans)
 
22
        return True
 
23
 
 
24
    # Called when the transition is done
 
25
    def stop(self, transition=None):
 
26
        if transition:
 
27
            self.stopped.append(transition)
 
28
            ok = True
 
29
            for t in self.trans_in:
 
30
                if t not in self.stopped:
 
31
                    ok=False
 
32
        else:
 
33
            ok = True
 
34
        if ok:
 
35
            for trans in self.trans_out:
 
36
                trans.destination.stop(trans)
 
37
        return ok
 
38
 
 
39
    #
 
40
    # This function is called for all starting element when the job is run
 
41
    # it should read the element and call input method on them, finishing by a stop.
 
42
    #
 
43
    def run(self):
 
44
        self.start()
 
45
        self.stop()
 
46
 
 
47
    def output(self, rows, channel=None):
 
48
        for trans in self.trans_out:
 
49
            if (not channel) or (trans.channel_source==channel) or (not trans.channel_source):
 
50
                trans.destination.input(rows, trans)
 
51
 
 
52
    def input(self, rows, transition=None):
 
53
        self.output(rows)
 
54
 
 
55
class transition(object):
 
56
    def __init__(self, source, destination, status='open', channel_source=None, channel_destination=None):
 
57
        self.source = source
 
58
        self.source.trans_out.append(self)
 
59
        self.destination = destination
 
60
        self.destination.trans_in.append(self)
 
61
        self.status = 'open'
 
62
        self.channel_source = channel_source
 
63
        self.channel_destination = channel_destination
 
64
 
 
65
 
 
66
class job(object):
 
67
    def __init__(self, nodes):
 
68
        self.nodes = nodes
 
69
 
 
70
    def run(self):
 
71
        for n in self.nodes:
 
72
            if n.is_start:
 
73
                n.run()
 
74