~mwinter4/maus/ckov-update

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
"""
Single-threaded dataflows module.
"""
#  This file is part of MAUS: http://micewww.pp.rl.ac.uk:8080/projects/maus
#
#  MAUS is free software: you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  MAUS is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  You should have received a copy of the GNU General Public License
#  along with MAUS.  If not, see <http://www.gnu.org/licenses/>.

import json
import maus_cpp.run_action_manager

from framework.utilities import DataflowUtilities

class PipelineSingleThreadDataflowExecutor: # pylint: disable=R0902
    """
    @class PipelineSingleThreadDataflowExecutor
    Execute MAUS dataflows as a single-threaded pipeline.
    """

    def __init__(self, inputer, transformer, merger, outputer, json_config_doc): # pylint: disable=R0913,C0301
        """
        Save references to arguments and parse the JSON configuration
        document.
        
        @param self Object reference.
        @param inputer Input task.
        @param transformer Transformer task.
        @param merger Merger task.
        @param outputer Output task.
        @param json_config_doc JSON configuration document.
        """
        self.inputer = inputer
        self.transformer = transformer
        self.merger = merger
        self.outputer = outputer
        self.json_config_doc = json_config_doc
        #  Parse the configuration JSON
        head_mode = json.loads(self.json_config_doc)["header_and_footer_mode"]
        self.write_headers = head_mode == "append"
        self.run_number = "first" # used to register first run
        self.end_of_run_spill = None

    def execute(self, job_header, job_footer):
        """
        Execute the dataflow

        Birth outputter, write job header, birth merger, transformer, inputter.
        Read events from the input, pass through the transform, merge and
        output. Death inputter, transformer, merger; write job footer; death
        outputter.

        Birth order is chosen because I want to write JobHeader as early as
        possible and JobFooter as late as possible.

        @param job_header JobHeader in python (i.e. dicts etc) format.
        @param job_footer JobFooter in python (i.e. dicts etc) format.
        """
        # Note in all the assert statements - new style (API-compliant) modules
        # should raise an exception on fail and return void. Old style modules
        # would return true/false on success/failure of birth and death.
        try:
            print("OUTPUT: Setting up outputer")
            birth = self.outputer.birth(self.json_config_doc)
            assert(birth == True or birth == None)

            print("Writing JobHeader...")
            if self.write_headers:
                self.outputer.save(json.dumps(job_header))

            print("INPUT: Setting up input")
            birth = self.inputer.birth(self.json_config_doc)
            assert(birth == True or birth == None)

            print("PIPELINE: Get event, TRANSFORM, MERGE, OUTPUT, repeat")

            emitter = self.inputer.emitter()
            # This helps us time how long the setup that sometimes happens
            # in the first event takes
            print("HINT: MAUS will process 1 event only at first...")
            map_buffer = DataflowUtilities.buffer_input(emitter, 1)
            
            i = 0
            while len(map_buffer) != 0:
                for event in map_buffer:
                    self.process_event(event)
                i += len(map_buffer)
                map_buffer = DataflowUtilities.buffer_input(emitter, 1)

                # Not Python 3 compatible print() due to backward
                # compatability. 
                print "TRANSFORM/MERGE/OUTPUT: ",
                print "Processed %d events so far," % i,
                print "%d events in buffer." % (len(map_buffer))
        except:
            raise

        finally:
            if self.run_number == "first":
                self.run_number = 0
            self.end_of_run(self.run_number)

            print("INPUT: Shutting down inputer")
            death = self.inputer.death()
            assert(death == True or death == None)
            if self.write_headers:
                self.outputer.save(json.dumps(job_footer))

            death = self.outputer.death()
            print("OUTPUT: Shutting down outputer")
            assert(death == True or death == None)

    def process_event(self, event):
        """
        Process a single event
        
        Process a single event - if it is a Spill, check for run_number change
        and call EndOfEvent/StartOfEvent if run_number has changed.
        """
        event_json = json.loads(event)
        if DataflowUtilities.get_event_type(event_json) == "Spill":
            current_run_number = DataflowUtilities.get_run_number(event_json)
            if (DataflowUtilities.is_end_of_run(event_json)):
                self.end_of_run_spill = event_json
            if current_run_number != self.run_number:
                if self.run_number != "first":
                    self.end_of_run(self.run_number)
                self.start_of_run(current_run_number)
                self.run_number = current_run_number
            event = self.transformer.process(event)
            event = self.merger.process(event)
        self.outputer.save(event)

    def start_of_run(self, new_run_number):
        """
        At the start_of_run, we birth the merger and transformer, then
        call start_of_run on the run_action_manager

        @param new_run_number run number of the run that is starting
        """
        run_header = maus_cpp.run_action_manager.start_of_run(new_run_number)

        print("MERGE: Setting up merger")
        birth = self.merger.birth(self.json_config_doc)
        assert(birth == True or birth == None)

        print("TRANSFORM: Setting up transformer")
        birth = self.transformer.birth(self.json_config_doc)
        assert(birth == True or birth == None)
        if self.write_headers:
            self.outputer.save(run_header)

    def end_of_run(self, old_run_number):
        """
        At the end_of_run, we death the transformer and merger, then call
        end_of_run on the run_action_manager (note reverse ordering, not that it
        should matter)

        @param old_run_number run number of the run that is ending
        """
        if (self.end_of_run_spill == None):
            print "  Missing an end_of_run spill..."
            print "  ...creating one to flush the mergers!"
            self.end_of_run_spill = {"daq_event_type":"end_of_run",
                                     "maus_event_type":"Spill",
                                     "run_number":self.run_number,
                                     "spill_number":-1}
            end_of_run_spill_str = json.dumps(self.end_of_run_spill)
            end_of_run_spill_str = self.merger.process(end_of_run_spill_str)
            if self.write_headers: # write to disk only if write_headers is set
                self.outputer.save(end_of_run_spill_str)
        self.end_of_run_spill = None

        print("TRANSFORM: Shutting down transformer")
        death = self.transformer.death()
        assert(death == True or death == None)

        print("MERGE: Shutting down merger")
        death = self.merger.death()
        assert(death == True or death == None)

        run_footer = maus_cpp.run_action_manager.end_of_run(old_run_number)
        if self.write_headers:
            self.outputer.save(run_footer)

    @staticmethod
    def get_dataflow_description():
        """
        Get dataflow description.

        @return description.
        """
        description = "Run in a pipeline programming fashion with only a\n"
        description += "single thread. See Wikipedia on 'pipeline\n"
        description += "programming' for more information."
        return description