47
import string # pylint: disable=W0402
49
50
MONGODB = 'maus-new' # no '.' character
50
51
LOCKFILE = os.path.join(os.environ['MAUS_ROOT_DIR'], 'tmp', '.maus_lockfile')
53
# maximum % of total memory usage before a process is restarted
56
# time between polls in seconds
59
# list of reducers to be used in the online job
54
61
'reconstruct_daq_scalars_reducer.py',
55
62
'reconstruct_daq_tof_reducer.py',
63
'reconstruct_daq_tofcalib_reducer.py',
56
64
'reconstruct_daq_ckov_reducer.py',
65
'reconstruct_daq_kl_reducer.py',
57
66
'reconstruct_monitor_reducer.py',
71
Wrapper for a subprocess POpen object
73
Wraps a subprocess with additional functionality to check memory usage and
74
kill the subprocess and restart it in the case of a leak. See #1328.
77
def __init__(self, subprocess_arg_list, log_file):
79
Set up the log file and start the subprocess
81
self.arg_list = subprocess_arg_list
82
self.log_name = log_file
83
self.log = open(log_file, "w")
89
Returns None if the process is running or the returncode if it finished
91
Checks memory footprint for the subprocess and restarts the process if
92
it exceeeds MAX_MEM_USAGE
95
poll_out = proc.poll()
97
mem_usage = self.memory_usage()
98
if mem_usage > MAX_MEM_USAGE:
99
print mem_usage, "% of memory used for process", proc.pid, \
100
"which exceeds the maximum", MAX_MEM_USAGE, \
101
"% - restarting the process"
103
self._start_process()
104
print str(proc.pid).rjust(6), str(mem_usage).ljust(6),
106
print '\nProcess', proc.pid, 'failed'
109
def memory_usage(self):
111
Return the memory usage (%) of the associated subprocess
113
ps_out = subprocess.check_output(['ps', '-p', str(self.subproc.pid),
115
return float(ps_out.strip(' \n'))
117
def _start_process(self):
121
self.subproc = subprocess.Popen(self.arg_list, \
122
stdout=self.log, stderr=subprocess.STDOUT)
123
print 'Started process with pid', self.subproc.pid, 'and log file', \
60
126
def poll_processes(proc_list):
62
128
Poll processes in process list. Return True if processes are all running,
78
139
Open the celery demon process - sets up workers for MAUS to reconstruct on
80
print 'Starting celery with log file ', celeryd_log_file_name,
81
log = open(celeryd_log_file_name, 'w')
82
proc = subprocess.Popen(['celeryd', '-c8', '-lINFO', '--purge'], \
83
stdout=log, stderr=subprocess.STDOUT)
84
print 'with pid', proc.pid # pylint: disable = E1101
141
print 'Starting celery... ',
142
proc = OnlineProcess(['celeryd', '-c8', '-lINFO', '--purge'],
143
celeryd_log_file_name)
87
146
def maus_web_app_process(maus_web_log_file_name):
89
148
Open the maus web app process - dynamically generates web pages for MAUS
92
print 'Starting maus web app with log file ', maus_web_log_file_name,
93
log = open(maus_web_log_file_name, 'w')
151
print 'Starting maus web app...',
94
152
maus_web = os.path.join(os.environ['MAUS_WEB_DIR'], 'src/mausweb/manage.py')
95
proc = subprocess.Popen(
96
['python', maus_web, 'runserver', 'localhost:9000'],
97
stdout=log, stderr=subprocess.STDOUT)
98
print 'with pid', proc.pid # pylint: disable = E1101
153
proc = OnlineProcess(['python', maus_web, 'runserver', 'localhost:9000'],
154
maus_web_log_file_name)
101
157
def maus_input_transform_process(maus_input_log, _extra_args):
103
159
Open the input transform process - runs against data and performs
104
160
reconstruction, leaving reconstructed data in a database somewhere.
106
print 'Starting reconstruction with log file ', maus_input_log,
107
log = open(maus_input_log, 'w')
162
print 'Starting input-transform...',
109
164
os.path.join(os.environ['MAUS_ROOT_DIR'],
110
165
'bin/online/analyze_data_online_input_transform.py')
111
proc = subprocess.Popen(
112
['python', maus_inp, '-mongodb_database_name='+MONGODB,
113
'-type_of_dataflow=multi_process_input_transform',
115
'-DAQ_hostname=miceraid5']+_extra_args,
116
stdout=log, stderr=subprocess.STDOUT)
117
print 'with pid', proc.pid # pylint: disable = E1101
166
proc = OnlineProcess(['python', maus_inp,
167
'-mongodb_database_name='+MONGODB,
168
'-type_of_dataflow=multi_process_input_transform',
170
'-DAQ_hostname=miceraid5']+_extra_args,
120
174
def maus_merge_output_process(maus_output_log, reducer_name, output_name,
123
177
Open the merge output process - runs against reconstructed data and collects
124
178
into a bunch of histograms.
126
print 'Starting reconstruction with log file ', maus_output_log,
127
log = open(maus_output_log, 'w')
180
print 'Starting reducer...',
128
181
maus_red = os.path.join(os.environ['MAUS_ROOT_DIR'], 'bin/online',
130
proc = subprocess.Popen(
131
['python', maus_red, '-mongodb_database_name='+MONGODB,
132
'-type_of_dataflow=multi_process_merge_output',
133
'-output_json_file_name='+output_name,
134
'-reduce_plot_refresh_rate=60']+_extra_args,
135
stdout=log, stderr=subprocess.STDOUT)
136
print 'with pid', proc.pid # pylint: disable = E1101
183
# strip trailing .py or whatever
184
root_name = string.join(reducer_name.split('.')[0:-1], '.')
186
root_name = reducer_name
188
proc = OnlineProcess(['python', maus_red,
189
'-mongodb_database_name='+MONGODB,
190
'-type_of_dataflow=multi_process_merge_output',
191
'-output_json_file_name='+output_name,
192
'-reduce_plot_refresh_rate=60',
193
'-output_root_file_name='+root_name,
194
'-output_root_file_mode=end_of_run_file_per_run']+\
139
199
def monitor_mongodb(url, database_name, file_handle):
250
320
fout = open(LOCKFILE, 'w')
251
321
print >> fout, os.getpid()
252
322
for proc in _procs :
253
print >> fout, proc.pid
323
print >> fout, proc.subproc.pid
256
326
def cleanup(_procs):
258
Kill any subprocesses of this process
328
Kill any subprocesses in _procs list of OnlineProcesses
261
print 'Exiting... killing all MAUS processes'
262
for process in _procs:
331
for online_process in _procs:
332
process = online_process.subproc
263
333
if process.poll() == None:
264
334
print 'Attempting to kill process', str(process.pid)
265
335
process.send_signal(signal.SIGINT)
266
336
while len(_procs) > 0:
268
for process in _procs:
338
for online_process in _procs:
339
process = online_process.subproc
269
340
print 'Polling process', process.pid,
270
341
if process.poll() == None:
271
342
print '... process did not die - it is still working '+\
272
343
'(check the log file)'
273
_proc_alive.append(process)
344
_proc_alive.append(online_process)
275
346
print '... process', str(process.pid), \
276
347
'is dead with return code', str(process.returncode)
314
380
reduce_log = os.path.join(log_dir, reducer[0:-3]+'.log')
315
381
PROCESSES.append(maus_merge_output_process(reduce_log,
316
382
reducer, debug_json, extra_args))
318
383
make_lockfile(PROCESSES)
319
384
print '\nCTRL-C to quit\n'
320
385
mongo_log = open(os.path.join(log_dir, 'mongodb.log'), 'w')