2
Single-threaded dataflows module.
4
# This file is part of MAUS: http://micewww.pp.rl.ac.uk:8080/projects/maus
6
# MAUS is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
11
# MAUS is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
# GNU General Public License for more details.
16
# You should have received a copy of the GNU General Public License
17
# along with MAUS. If not, see <http://www.gnu.org/licenses/>.
21
from framework.utilities import DataflowUtilities
23
class PipelineSingleThreadDataflowExecutor:
25
@class PipelineSingleThreadDataflowExecutor
26
Execute MAUS dataflows as a single-threaded pipeline.
29
def __init__(self, inputer, transformer, merger, outputer, json_config_doc): # pylint: disable=R0913,C0301
31
Save references to arguments and parse the JSON configuration
34
@param self Object reference.
35
@param inputer Input task.
36
@param transformer Transformer task.
37
@param merger Merger task.
38
@param outputer Output task.
39
@param json_config_doc JSON configuration document.
41
self.inputer = inputer
42
self.transformer = transformer
44
self.outputer = outputer
45
self.json_config_doc = json_config_doc
46
# Parse the configuration JSON
47
self.json_config_dictionary = json.loads(self.json_config_doc)
51
Execute the dataflow - spills are, in turn, read from the
52
input, passed through the transform, merge and output.
54
@param self Object reference.
56
print("INPUT: Reading input")
57
assert(self.inputer.birth(self.json_config_doc) == True)
58
emitter = self.inputer.emitter()
59
# This helps us time how long the setup that sometimes happens
60
# in the first spill takes
61
print("HINT: MAUS will process 1 spill only at first...")
62
map_buffer = DataflowUtilities.buffer_input(emitter, 1)
64
print("TRANSFORM: Setting up transformer (this can take a while...)")
65
assert(self.transformer.birth(self.json_config_doc) == True)
67
print("MERGE: Setting up merger")
68
assert(self.merger.birth(self.json_config_doc) == True)
70
print("OUTPUT: Setting up outputer")
71
assert(self.outputer.birth(self.json_config_doc) == True)
73
print("PIPELINE: Get spill, TRANSFORM, MERGE, OUTPUT, repeat")
76
while len(map_buffer) != 0:
77
for spill in map_buffer:
78
spill = self.transformer.process(spill)
79
spill = self.merger.process(spill)
80
self.outputer.save(spill)
83
map_buffer = DataflowUtilities.buffer_input(emitter, 1)
85
# Not Python 3 compatible print() due to backward
87
print "TRANSFORM/MERGE/OUTPUT: ",
88
print "Processed %d spills so far," % i,
89
print "%d spills in buffer." % (len(map_buffer))
91
print("CLOSING PIPELINE: Sending END_OF_RUN to merger")
93
end_of_run = {"daq_data":None, "daq_event_type":"end_of_run",
96
end_of_run_spill = json.dumps(end_of_run)
97
spill = self.merger.process(end_of_run_spill)
98
self.outputer.save(spill)
100
print("TRANSFORM: Shutting down transformer")
101
assert(self.transformer.death() == True)
103
print("MERGE: Shutting down merger")
104
assert(self.merger.death() == True)
106
print("OUTPUT: Shutting down outputer")
107
assert(self.outputer.death() == True)
110
def get_dataflow_description():
112
Get dataflow description.
116
description = "Run in a pipeline programming fashion with only a\n"
117
description += "single thread. See Wikipedia on 'pipeline\n"
118
description += "programming' for more information."