~i-taylor/maus/map-base

« back to all changes in this revision

Viewing changes to bin/analyze_data_online.py

  • Committer: Ian Taylor
  • Date: 2013-10-22 15:26:19 UTC
  • mfrom: (912.1.93 merge)
  • Revision ID: i.taylor@warwick.ac.uk-20131022152619-bv3mfc3dd1tnyqoy
Merging with current merge branch.

Show diffs side-by-side

added added

removed removed

Lines of Context:
44
44
import signal
45
45
import subprocess
46
46
import time
 
47
import string # pylint: disable=W0402
47
48
import pymongo
48
49
 
49
50
MONGODB = 'maus-new' # no '.' character
50
51
LOCKFILE = os.path.join(os.environ['MAUS_ROOT_DIR'], 'tmp', '.maus_lockfile')
51
52
PROCESSES = []
 
53
# maximum % of total memory usage before a process is restarted
 
54
MAX_MEM_USAGE = 15.
 
55
 
 
56
# time between polls in seconds
52
57
POLL_TIME = 10
 
58
 
 
59
# list of reducers to be used in the online job
53
60
REDUCER_LIST = [
54
61
  'reconstruct_daq_scalars_reducer.py',
55
62
  'reconstruct_daq_tof_reducer.py',
 
63
  'reconstruct_daq_tofcalib_reducer.py',
56
64
  'reconstruct_daq_ckov_reducer.py',
 
65
  'reconstruct_daq_kl_reducer.py',
57
66
  'reconstruct_monitor_reducer.py',
58
67
]
59
68
 
 
69
class OnlineProcess:
 
70
    """
 
71
    Wrapper for a subprocess POpen object
 
72
 
 
73
    Wraps a subprocess with additional functionality to check memory usage and
 
74
    kill the subprocess and restart it in the case of a leak. See #1328.
 
75
    """
 
76
 
 
77
    def __init__(self, subprocess_arg_list, log_file):
 
78
        """
 
79
        Set up the log file and start the subprocess
 
80
        """
 
81
        self.arg_list = subprocess_arg_list
 
82
        self.log_name = log_file
 
83
        self.log = open(log_file, "w")
 
84
        self.subproc = None
 
85
        self._start_process()
 
86
 
 
87
    def poll(self):
 
88
        """
 
89
        Returns None if the process is running or the returncode if it finished
 
90
 
 
91
        Checks memory footprint for the subprocess and restarts the process if
 
92
        it exceeeds MAX_MEM_USAGE
 
93
        """
 
94
        proc = self.subproc
 
95
        poll_out = proc.poll()
 
96
        if poll_out == None:
 
97
            mem_usage = self.memory_usage()
 
98
            if mem_usage > MAX_MEM_USAGE:
 
99
                print mem_usage, "% of memory used for process", proc.pid, \
 
100
                      "which exceeds the maximum", MAX_MEM_USAGE, \
 
101
                      "% - restarting the process"
 
102
                cleanup([self])
 
103
                self._start_process()
 
104
            print str(proc.pid).rjust(6), str(mem_usage).ljust(6),
 
105
        else:
 
106
            print '\nProcess', proc.pid, 'failed'
 
107
        return poll_out
 
108
 
 
109
    def memory_usage(self):
 
110
        """
 
111
        Return the memory usage (%) of the associated subprocess
 
112
        """
 
113
        ps_out = subprocess.check_output(['ps', '-p', str(self.subproc.pid),
 
114
                                          'h', '-o%mem'])
 
115
        return float(ps_out.strip(' \n'))
 
116
 
 
117
    def _start_process(self):
 
118
        """
 
119
        Start the subprocess
 
120
        """
 
121
        self.subproc = subprocess.Popen(self.arg_list, \
 
122
                                      stdout=self.log, stderr=subprocess.STDOUT)
 
123
        print 'Started process with pid', self.subproc.pid, 'and log file', \
 
124
              self.log_name
 
125
 
60
126
def poll_processes(proc_list):
61
127
    """
62
128
    Poll processes in process list. Return True if processes are all running,
64
130
    """
65
131
    all_ok = True
66
132
    for proc in proc_list:
67
 
        running = proc.poll() == None
68
 
        all_ok = all_ok and running
69
 
        if running:
70
 
            print '.',
71
 
        else:
72
 
            print '\nProcess', proc.pid, 'failed'
 
133
        all_ok = all_ok and proc.poll() == None
73
134
    print
74
135
    return all_ok
75
136
 
77
138
    """
78
139
    Open the celery demon process - sets up workers for MAUS to reconstruct on
79
140
    """
80
 
    print 'Starting celery with log file ', celeryd_log_file_name,
81
 
    log = open(celeryd_log_file_name, 'w')
82
 
    proc = subprocess.Popen(['celeryd', '-c8', '-lINFO', '--purge'], \
83
 
                                           stdout=log, stderr=subprocess.STDOUT)
84
 
    print 'with pid', proc.pid # pylint: disable = E1101
 
141
    print 'Starting celery... ',
 
142
    proc = OnlineProcess(['celeryd', '-c8', '-lINFO', '--purge'],
 
143
                         celeryd_log_file_name)
85
144
    return proc
86
145
 
87
146
def maus_web_app_process(maus_web_log_file_name):
89
148
    Open the maus web app process - dynamically generates web pages for MAUS
90
149
    output display
91
150
    """
92
 
    print 'Starting maus web app with log file ', maus_web_log_file_name,
93
 
    log = open(maus_web_log_file_name, 'w')
 
151
    print 'Starting maus web app...',
94
152
    maus_web = os.path.join(os.environ['MAUS_WEB_DIR'], 'src/mausweb/manage.py')
95
 
    proc = subprocess.Popen(
96
 
                       ['python', maus_web, 'runserver', 'localhost:9000'],
97
 
                       stdout=log, stderr=subprocess.STDOUT)
98
 
    print 'with pid', proc.pid # pylint: disable = E1101
 
153
    proc = OnlineProcess(['python', maus_web, 'runserver', 'localhost:9000'],
 
154
                         maus_web_log_file_name)
99
155
    return proc
100
156
 
101
157
def maus_input_transform_process(maus_input_log, _extra_args):
103
159
    Open the input transform process - runs against data and performs
104
160
    reconstruction, leaving reconstructed data in a database somewhere.
105
161
    """
106
 
    print 'Starting reconstruction with log file ', maus_input_log,
107
 
    log = open(maus_input_log, 'w')
 
162
    print 'Starting input-transform...',
108
163
    maus_inp = \
109
164
             os.path.join(os.environ['MAUS_ROOT_DIR'],
110
165
                          'bin/online/analyze_data_online_input_transform.py')
111
 
    proc = subprocess.Popen(
112
 
                       ['python', maus_inp, '-mongodb_database_name='+MONGODB,
113
 
                        '-type_of_dataflow=multi_process_input_transform',
114
 
                                        '-verbose_level=0',
115
 
                                                '-DAQ_hostname=miceraid5']+_extra_args,
116
 
                       stdout=log, stderr=subprocess.STDOUT)
117
 
    print 'with pid', proc.pid # pylint: disable = E1101
 
166
    proc = OnlineProcess(['python', maus_inp,
 
167
                          '-mongodb_database_name='+MONGODB,
 
168
                          '-type_of_dataflow=multi_process_input_transform',
 
169
                                                  '-verbose_level=0',
 
170
                                                              '-DAQ_hostname=miceraid5']+_extra_args,
 
171
                          maus_input_log)
118
172
    return proc
119
173
    
120
174
def maus_merge_output_process(maus_output_log, reducer_name, output_name,
123
177
    Open the merge output process - runs against reconstructed data and collects
124
178
    into a bunch of histograms.
125
179
    """
126
 
    print 'Starting reconstruction with log file ', maus_output_log,
127
 
    log = open(maus_output_log, 'w')
 
180
    print 'Starting reducer...',
128
181
    maus_red = os.path.join(os.environ['MAUS_ROOT_DIR'], 'bin/online',
129
182
                                                                   reducer_name)
130
 
    proc = subprocess.Popen(
131
 
                       ['python', maus_red, '-mongodb_database_name='+MONGODB,
132
 
                        '-type_of_dataflow=multi_process_merge_output',
133
 
                        '-output_json_file_name='+output_name,
134
 
                        '-reduce_plot_refresh_rate=60']+_extra_args,
135
 
                        stdout=log, stderr=subprocess.STDOUT)
136
 
    print 'with pid', proc.pid # pylint: disable = E1101
 
183
    # strip trailing .py or whatever
 
184
    root_name = string.join(reducer_name.split('.')[0:-1], '.')
 
185
    if root_name == '':
 
186
        root_name = reducer_name
 
187
    root_name += '.root'
 
188
    proc = OnlineProcess(['python', maus_red,
 
189
                          '-mongodb_database_name='+MONGODB,
 
190
                          '-type_of_dataflow=multi_process_merge_output',
 
191
                          '-output_json_file_name='+output_name,
 
192
                          '-reduce_plot_refresh_rate=60',
 
193
                          '-output_root_file_name='+root_name,
 
194
                          '-output_root_file_mode=end_of_run_file_per_run']+\
 
195
                          _extra_args,
 
196
                          maus_output_log)
137
197
    return proc
138
198
 
139
199
def monitor_mongodb(url, database_name, file_handle):
210
270
        os.kill(pid, signal.SIGKILL)
211
271
        print "Killed", pid
212
272
 
 
273
def remove_lockfile():
 
274
    """
 
275
    Delete the lockfile
 
276
    """
 
277
    if os.path.exists(LOCKFILE):
 
278
        os.remove(LOCKFILE)
 
279
        print 'Cleared lockfile'
 
280
    else:
 
281
        print 'Strange, I lost the lockfile...'
 
282
 
213
283
def clear_lockfile():
214
284
    """
215
285
    Clear an existing lockfile
250
320
    fout = open(LOCKFILE, 'w')
251
321
    print >> fout, os.getpid()
252
322
    for proc in _procs  :
253
 
        print >> fout, proc.pid
 
323
        print >> fout, proc.subproc.pid
254
324
    fout.close()
255
325
 
256
326
def cleanup(_procs):
257
327
    """
258
 
    Kill any subprocesses of this process
 
328
    Kill any subprocesses in _procs list of OnlineProcesses
259
329
    """
260
330
    returncode = 0
261
 
    print 'Exiting... killing all MAUS processes'
262
 
    for process in _procs:
 
331
    for online_process in _procs:
 
332
        process = online_process.subproc
263
333
        if process.poll() == None:
264
334
            print 'Attempting to kill process', str(process.pid)
265
335
            process.send_signal(signal.SIGINT)
266
336
    while len(_procs) > 0:
267
337
        _proc_alive = []
268
 
        for process in _procs:
 
338
        for online_process in _procs:
 
339
            process = online_process.subproc
269
340
            print 'Polling process', process.pid,
270
341
            if process.poll() == None:
271
342
                print '... process did not die - it is still working '+\
272
343
                      '(check the log file)'
273
 
                _proc_alive.append(process)
 
344
                _proc_alive.append(online_process)
274
345
            else:
275
346
                print '... process', str(process.pid), \
276
347
                      'is dead with return code', str(process.returncode)
278
349
        sys.stdout.flush()
279
350
        _procs = _proc_alive
280
351
        time.sleep(10)
281
 
    if os.path.exists(LOCKFILE):
282
 
        os.remove(LOCKFILE)
283
 
        print 'Cleared lockfile'
284
 
    else:
285
 
        print 'Strange, I lost the lockfile...'
286
352
    return returncode
287
353
 
288
354
def main():
314
380
            reduce_log = os.path.join(log_dir, reducer[0:-3]+'.log')
315
381
            PROCESSES.append(maus_merge_output_process(reduce_log,
316
382
                                              reducer, debug_json, extra_args))
317
 
 
318
383
        make_lockfile(PROCESSES)
319
384
        print '\nCTRL-C to quit\n'
320
385
        mongo_log = open(os.path.join(log_dir, 'mongodb.log'), 'w')
330
395
        returncode = 1
331
396
    finally:
332
397
        returncode = cleanup(PROCESSES)+returncode
 
398
        remove_lockfile()
333
399
        sys.exit(returncode)
334
400
 
335
401
if __name__ == "__main__":