4
# Proof of concept ETL computation
8
def __init__(self, *args, **argv):
11
self.has_output = True
12
self.is_start = argv.get('is_start', False)
13
self.name = argv.get('name', '')
18
# When a flow starts, it should pass elements like meta to next flows
19
def start(self, transition=None):
20
for trans in self.trans_out:
21
trans.destination.start(trans)
24
# Called when the transition is done
25
def stop(self, transition=None):
27
self.stopped.append(transition)
29
for t in self.trans_in:
30
if t not in self.stopped:
35
for trans in self.trans_out:
36
trans.destination.stop(trans)
40
# This function is called for all starting element when the job is run
41
# it should read the element and call input method on them, finishing by a stop.
47
def output(self, rows, channel=None):
48
for trans in self.trans_out:
49
if (not channel) or (trans.channel_source==channel) or (not trans.channel_source):
50
trans.destination.input(rows, trans)
52
def input(self, rows, transition=None):
55
class transition(object):
56
def __init__(self, source, destination, status='open', channel_source=None, channel_destination=None):
58
self.source.trans_out.append(self)
59
self.destination = destination
60
self.destination.trans_in.append(self)
62
self.channel_source = channel_source
63
self.channel_destination = channel_destination
67
def __init__(self, nodes):