46
46
self.json_config_doc = json_config_doc
47
47
# Parse the configuration JSON
48
48
self.json_config_dictionary = json.loads(self.json_config_doc)
49
self.run_number = "first" # used to register first run
50
def execute(self, job_header):
51
def execute(self, job_header, job_footer):
52
Execute the dataflow - events are, in turn, read from the
53
input, passed through the transform, merge and output.
55
@param self Object reference.
55
Birth outputter, write job header, birth merger, transformer, inputter.
56
Read events from the input, pass through the transform, merge and
57
output. Death inputter, transformer, merger; write job footer; death
60
Birth order is chosen because I want to write JobHeader as early as
61
possible and JobFooter as late as possible.
63
@param job_header JobHeader in python (i.e. dicts etc) format.
64
@param job_footer JobFooter in python (i.e. dicts etc) format.
57
66
# Note in all the assert statements - new style (API-compliant) modules
58
67
# should raise an exception on fail and return void. Old style modules
59
68
# would return true/false on success/failure of birth and death.
61
print("INPUT: Reading input")
62
assert(self.inputer.birth(self.json_config_doc) == True or \
63
self.inputer.birth(self.json_config_doc) == None)
64
emitter = self.inputer.emitter()
66
# NEEDS TO BE UNCOMMENTED - but only when I figure out how to
67
# implement in multithreaded mode also
68
#print "START OF RUN: Calling start of run"
69
#run_number=DataflowUtilities.get_run_number
70
# (json.loads(map_buffer[0]))
71
#maus_cpp.run_action_manager.start_of_run(run_number)
73
print("TRANSFORM: Setting up transformer")
74
assert(self.transformer.birth(self.json_config_doc) == True or \
75
self.transformer.birth(self.json_config_doc) == None)
77
print("MERGE: Setting up merger")
78
assert(self.merger.birth(self.json_config_doc) == True or \
79
self.merger.birth(self.json_config_doc) == None)
81
70
print("OUTPUT: Setting up outputer")
82
71
assert(self.outputer.birth(self.json_config_doc) == True or \
83
72
self.outputer.birth(self.json_config_doc) == None)
74
print("Writing JobHeader...")
75
self.outputer.save(json.dumps(job_header))
77
print("INPUT: Setting up input")
78
assert(self.inputer.birth(self.json_config_doc) == True or \
79
self.inputer.birth(self.json_config_doc) == None)
85
81
print("PIPELINE: Get event, TRANSFORM, MERGE, OUTPUT, repeat")
87
print("Processing JobHeader...")
88
self.outputer.save(json.dumps(job_header))
83
emitter = self.inputer.emitter()
90
84
# This helps us time how long the setup that sometimes happens
91
85
# in the first event takes
92
86
print("HINT: MAUS will process 1 event only at first...")
96
90
while len(map_buffer) != 0:
97
91
for event in map_buffer:
98
event_json = json.loads(event)
99
if "maus_event_type" in event_json and \
100
event_json["maus_event_type"] == "Spill":
101
event = self.transformer.process(event)
102
event = self.merger.process(event)
103
self.outputer.save(event)
92
self.process_event(event)
105
93
i += len(map_buffer)
106
94
map_buffer = DataflowUtilities.buffer_input(emitter, 1)
117
105
print("INPUT: Shutting down inputer")
118
106
assert(self.inputer.death() == True or \
119
107
self.inputer.death() == None)
121
print("TRANSFORM: Shutting down transformer")
122
assert(self.transformer.death() == True or \
123
self.transformer.death() == None)
125
print("MERGE: Shutting down merger")
126
assert(self.merger.death() == True or \
127
self.merger.death() == None)
108
if self.run_number == "first":
110
self.end_of_run(self.run_number)
111
self.outputer.save(json.dumps(job_footer))
129
113
print("OUTPUT: Shutting down outputer")
130
114
assert(self.outputer.death() == True or \
131
115
self.outputer.death() == None)
132
maus_cpp.run_action_manager.end_of_run()
117
def process_event(self, event):
119
Process a single event
121
Process a single event - if it is a Spill, check for run_number change
122
and call EndOfEvent/StartOfEvent if run_number has changed.
124
event_json = json.loads(event)
125
if DataflowUtilities.get_event_type(event_json) == "Spill":
126
current_run_number = DataflowUtilities.get_run_number(event_json)
127
if current_run_number != self.run_number:
128
if self.run_number != "first":
129
self.end_of_run(self.run_number)
130
self.start_of_run(current_run_number)
131
self.run_number = current_run_number
132
event = self.transformer.process(event)
133
event = self.merger.process(event)
134
self.outputer.save(event)
136
def start_of_run(self, new_run_number):
138
At the start_of_run, we birth the merger and transformer, then
139
call start_of_run on the run_action_manager
141
@param new_run_number run number of the run that is starting
143
print("MERGE: Setting up merger")
144
assert(self.merger.birth(self.json_config_doc) == True or \
145
self.merger.birth(self.json_config_doc) == None)
147
print("TRANSFORM: Setting up transformer")
148
assert(self.transformer.birth(self.json_config_doc) == True or \
149
self.transformer.birth(self.json_config_doc) == None)
151
run_header = maus_cpp.run_action_manager.start_of_run(new_run_number)
152
self.outputer.save(run_header)
154
def end_of_run(self, old_run_number):
156
At the end_of_run, we death the transformer and merger, then call
157
end_of_run on the run_action_manager (note reverse ordering, not that it
160
@param old_run_number run number of the run that is ending
162
print("TRANSFORM: Shutting down transformer")
163
assert(self.transformer.death() == True or \
164
self.transformer.death() == None)
166
print("MERGE: Shutting down merger")
167
assert(self.merger.death() == True or \
168
self.merger.death() == None)
170
run_footer = maus_cpp.run_action_manager.end_of_run(old_run_number)
171
self.outputer.save(run_footer)
135
174
def get_dataflow_description():