~ajdobbs/maus/event-selection

« back to all changes in this revision

Viewing changes to src/common_py/framework/single_thread.py

  • Committer: Chris Rogers
  • Date: 2012-09-20 16:06:44 UTC
  • mto: (663.23.1 _maus_merge)
  • mto: This revision was merged to the branch mainline in revision 676.
  • Revision ID: chris.rogers@stfc.ac.uk-20120920160644-fcebucjuftmzfxyl
Now able to write job header. job footer, run header, run footer - but some fails in integration tests

Show diffs side-by-side

added added

removed removed

Lines of Context:
46
46
        self.json_config_doc = json_config_doc
47
47
        #  Parse the configuration JSON
48
48
        self.json_config_dictionary = json.loads(self.json_config_doc)
 
49
        self.run_number = "first" # used to register first run
49
50
 
50
 
    def execute(self, job_header):
 
51
    def execute(self, job_header, job_footer):
51
52
        """
52
 
        Execute the dataflow - events are, in turn, read from the 
53
 
        input, passed through the transform, merge and output.
54
 
 
55
 
        @param self Object reference.
 
53
        Execute the dataflow
 
54
 
 
55
        Birth outputter, write job header, birth merger, transformer, inputter.
 
56
        Read events from the input, pass through the transform, merge and
 
57
        output. Death inputter, transformer, merger; write job footer; death
 
58
        outputter.
 
59
 
 
60
        Birth order is chosen because I want to write JobHeader as early as
 
61
        possible and JobFooter as late as possible.
 
62
 
 
63
        @param job_header JobHeader in python (i.e. dicts etc) format.
 
64
        @param job_footer JobFooter in python (i.e. dicts etc) format.
56
65
        """
57
66
        # Note in all the assert statements - new style (API-compliant) modules
58
67
        # should raise an exception on fail and return void. Old style modules
59
68
        # would return true/false on success/failure of birth and death.
60
69
        try:
61
 
            print("INPUT: Reading input")
62
 
            assert(self.inputer.birth(self.json_config_doc) == True or \
63
 
                   self.inputer.birth(self.json_config_doc) == None)
64
 
            emitter = self.inputer.emitter()
65
 
 
66
 
            # NEEDS TO BE UNCOMMENTED - but only when I figure out how to
67
 
            # implement in multithreaded mode also
68
 
            #print "START OF RUN: Calling start of run"
69
 
            #run_number=DataflowUtilities.get_run_number
70
 
            #                                        (json.loads(map_buffer[0]))
71
 
            #maus_cpp.run_action_manager.start_of_run(run_number)
72
 
 
73
 
            print("TRANSFORM: Setting up transformer")
74
 
            assert(self.transformer.birth(self.json_config_doc) == True or \
75
 
                   self.transformer.birth(self.json_config_doc) == None)
76
 
 
77
 
            print("MERGE: Setting up merger")
78
 
            assert(self.merger.birth(self.json_config_doc) == True or \
79
 
                   self.merger.birth(self.json_config_doc) == None)
80
 
 
81
70
            print("OUTPUT: Setting up outputer")
82
71
            assert(self.outputer.birth(self.json_config_doc) == True or \
83
72
                   self.outputer.birth(self.json_config_doc) == None)
84
73
 
 
74
            print("Writing JobHeader...")
 
75
            self.outputer.save(json.dumps(job_header))
 
76
 
 
77
            print("INPUT: Setting up input")
 
78
            assert(self.inputer.birth(self.json_config_doc) == True or \
 
79
                   self.inputer.birth(self.json_config_doc) == None)
 
80
 
85
81
            print("PIPELINE: Get event, TRANSFORM, MERGE, OUTPUT, repeat")
86
82
 
87
 
            print("Processing JobHeader...")
88
 
            self.outputer.save(json.dumps(job_header))
89
 
 
 
83
            emitter = self.inputer.emitter()
90
84
            # This helps us time how long the setup that sometimes happens
91
85
            # in the first event takes
92
86
            print("HINT: MAUS will process 1 event only at first...")
95
89
            i = 0
96
90
            while len(map_buffer) != 0:
97
91
                for event in map_buffer:
98
 
                    event_json = json.loads(event)
99
 
                    if "maus_event_type" in event_json and \
100
 
                       event_json["maus_event_type"] == "Spill":
101
 
                        event = self.transformer.process(event)
102
 
                        event = self.merger.process(event)
103
 
                    self.outputer.save(event)
104
 
 
 
92
                    self.process_event(event)
105
93
                i += len(map_buffer)
106
94
                map_buffer = DataflowUtilities.buffer_input(emitter, 1)
107
95
 
117
105
            print("INPUT: Shutting down inputer")
118
106
            assert(self.inputer.death() == True or \
119
107
                   self.inputer.death() == None)
120
 
 
121
 
            print("TRANSFORM: Shutting down transformer")
122
 
            assert(self.transformer.death() == True or \
123
 
                   self.transformer.death() == None)
124
 
 
125
 
            print("MERGE: Shutting down merger")
126
 
            assert(self.merger.death() == True or \
127
 
                   self.merger.death() == None)
 
108
            if self.run_number == "first":
 
109
                self.run_number = 0
 
110
            self.end_of_run(self.run_number)
 
111
            self.outputer.save(json.dumps(job_footer))
128
112
 
129
113
            print("OUTPUT: Shutting down outputer")
130
114
            assert(self.outputer.death() == True or \
131
115
                   self.outputer.death() == None)
132
 
            maus_cpp.run_action_manager.end_of_run()
 
116
 
 
117
    def process_event(self, event):
 
118
        """
 
119
        Process a single event
 
120
        
 
121
        Process a single event - if it is a Spill, check for run_number change
 
122
        and call EndOfEvent/StartOfEvent if run_number has changed.
 
123
        """
 
124
        event_json = json.loads(event)
 
125
        if DataflowUtilities.get_event_type(event_json) == "Spill":
 
126
            current_run_number = DataflowUtilities.get_run_number(event_json)
 
127
            if current_run_number != self.run_number:
 
128
                if self.run_number != "first":
 
129
                    self.end_of_run(self.run_number)
 
130
                self.start_of_run(current_run_number)
 
131
                self.run_number = current_run_number
 
132
            event = self.transformer.process(event)
 
133
            event = self.merger.process(event)
 
134
        self.outputer.save(event)
 
135
 
 
136
    def start_of_run(self, new_run_number):
 
137
        """
 
138
        At the start_of_run, we birth the merger and transformer, then
 
139
        call start_of_run on the run_action_manager
 
140
 
 
141
        @param new_run_number run number of the run that is starting
 
142
        """
 
143
        print("MERGE: Setting up merger")
 
144
        assert(self.merger.birth(self.json_config_doc) == True or \
 
145
               self.merger.birth(self.json_config_doc) == None)
 
146
 
 
147
        print("TRANSFORM: Setting up transformer")
 
148
        assert(self.transformer.birth(self.json_config_doc) == True or \
 
149
               self.transformer.birth(self.json_config_doc) == None)
 
150
 
 
151
        run_header = maus_cpp.run_action_manager.start_of_run(new_run_number)
 
152
        self.outputer.save(run_header)
 
153
 
 
154
    def end_of_run(self, old_run_number):
 
155
        """
 
156
        At the end_of_run, we death the transformer and merger, then call
 
157
        end_of_run on the run_action_manager (note reverse ordering, not that it
 
158
        should matter)
 
159
 
 
160
        @param old_run_number run number of the run that is ending
 
161
        """
 
162
        print("TRANSFORM: Shutting down transformer")
 
163
        assert(self.transformer.death() == True or \
 
164
               self.transformer.death() == None)
 
165
 
 
166
        print("MERGE: Shutting down merger")
 
167
        assert(self.merger.death() == True or \
 
168
               self.merger.death() == None)
 
169
 
 
170
        run_footer = maus_cpp.run_action_manager.end_of_run(old_run_number)
 
171
        self.outputer.save(run_footer)
133
172
 
134
173
    @staticmethod
135
174
    def get_dataflow_description():