1
1
#!/usr/bin/env python
2
# File created on 07 Jul 2012
4
3
from __future__ import division
5
from random import choice
6
from os import popen, system, getenv, mkdir
7
from subprocess import Popen, PIPE, STDOUT
8
from os.path import split
10
from cogent.parse.fasta import MinimalFastaParser
11
from qiime.util import qiime_system_call
13
5
__author__ = "Greg Caporaso"
14
__copyright__ = "Copyright 2011, The QIIME Project"
15
__credits__ = ["Greg Caporaso"]
6
__copyright__ = "Copyright 2011, The QIIME project"
7
__credits__ = ["Greg Caporaso", "Jens Reeder", "Jai Ram Rideout"]
18
10
__maintainer__ = "Greg Caporaso"
19
11
__email__ = "gregcaporaso@gmail.com"
20
12
__status__ = "Release"
15
from os.path import split, splitext, join
16
from os import makedirs, mkdir
17
from random import choice
18
from cogent.parse.fasta import MinimalFastaParser
19
from qiime.split import split_fasta
20
from qiime.util import (load_qiime_config,
21
get_qiime_scripts_dir,
25
qiime_config = load_qiime_config()
22
27
RANDOM_JOB_PREFIX_CHARS = "abcdefghigklmnopqrstuvwxyz"
23
28
RANDOM_JOB_PREFIX_CHARS += RANDOM_JOB_PREFIX_CHARS.upper()
24
29
RANDOM_JOB_PREFIX_CHARS += "0123456790"
26
def split_fasta(infile, seqs_per_file, outfile_prefix, working_dir=''):
27
""" Split infile into files with seqs_per_file sequences in each
29
infile: list of fasta lines or open file object
30
seqs_per_file: the number of sequences to include in each file
31
out_fileprefix: string used to create output filepath - output filepaths
32
are <out_prefix>.<i>.fasta where i runs from 0 to number of output files
33
working_dir: directory to prepend to temp filepaths (defaults to
34
empty string -- files written to cwd)
36
List of output filepaths is returned.
41
if working_dir and not working_dir.endswith('/'):
31
class ParallelWrapper(object):
36
python_exe_fp=qiime_config['python_exe_fp'],
37
cluster_jobs_fp=qiime_config['cluster_jobs_fp'],
38
jobs_to_start=int(qiime_config['jobs_to_start']),
39
poller_fp=join(get_qiime_scripts_dir(),'poller.py'),
40
retain_temp_files=False,
41
suppress_polling=False,
42
seconds_to_sleep=int(qiime_config['seconds_to_sleep'])):
45
self._python_exe_fp = python_exe_fp
46
self._cluster_jobs_fp = cluster_jobs_fp
47
self._jobs_to_start = jobs_to_start
48
self._poller_fp = poller_fp
49
self._retain_temp_files = retain_temp_files
50
self._suppress_polling = suppress_polling
51
self._seconds_to_sleep = seconds_to_sleep
53
def _call_initialization(self,
59
suppress_submit_jobs):
60
""" Called as the first step in __call__.
64
def _call_cleanup(self,
70
suppress_submit_jobs):
71
""" Called as the last step in __call__.
81
suppress_submit_jobs=False):
83
## Generate a list of files and directories that will need to be cleaned up
84
self.files_to_remove = []
86
# Perform any method-specific setup. This should prevent the need to
88
self._call_initialization(input_fp,
95
# split the input filepath into directory and filename, base filename and
96
# extension for use in naming other files
98
input_dir, input_fn = split(input_fp)
99
input_file_basename, input_ext = splitext(input_fn)
100
except AttributeError:
101
## THIS IS AWFUL - SOME OF THE SCRIPTS PASS A LIST, SO THE
102
## PREVIOUS BLOCK WON'T WORK... WHAT DO WE WANT TO DO?
103
input_dir, input_fn = split(input_fp[0])
104
input_file_basename, input_ext = splitext(input_fn)
106
# Allow the user to override the default job_prefix (defined by the
108
if job_prefix is None:
109
job_prefix = self._get_random_job_prefix(self._job_prefix)
110
# A temporary output directory is created in output_dir named
111
# job_prefix. Output files are then moved from the temporary
112
# directory to the output directory when they are complete,
113
# allowing a poller to detect when runs complete by the presence
114
# of their output files.
115
working_dir = join(output_dir,job_prefix)
117
makedirs(working_dir)
118
self.files_to_remove.append(working_dir)
120
# working dir already exists
48
for seq_id,seq in MinimalFastaParser(infile):
50
current_out_fp = '%s%s.%d.fasta' \
51
% (working_dir,outfile_prefix,len(out_files))
52
current_out_file = open(current_out_fp, 'w')
53
out_files.append(current_out_fp)
54
current_out_file.write('>%s\n%s\n' % (seq_id, seq))
57
if seq_counter == seqs_per_file:
58
current_out_file.close()
63
def merge_to_n_commands(commands,n,delimiter=' ; ',
64
command_prefix=None,command_suffix=None):
65
""" merge a list of commands into n commands """
67
raise ValueError, "n must be an integer >= 1"
69
if command_prefix == None:
70
command_prefix = '/bin/bash ;'
72
command_prefix = command_prefix
74
if command_suffix == None:
75
command_suffix = '; exit'
77
command_suffix = command_suffix
80
commands_per_merged_command = int(ceil((len(commands)/n)))
81
# begin iterating through the commands
84
for command in commands:
85
current_cmds.append(command)
88
if cmd_counter == commands_per_merged_command:
89
result.append(delimiter.join(current_cmds))
94
result[-1] = delimiter.join([result[-1]] + current_cmds)
96
for i,r in enumerate(result):
97
r = '%s %s %s' % (command_prefix, r, command_suffix)
99
# result = ['%s %s %s' % (command_prefix, r, command_suffix)
104
def get_random_job_prefix(fixed_prefix='',max_job_prefix_len=10,\
105
leading_trailing_underscores=True):
106
""" Return a string to use as job prefix
110
length = max_job_prefix_len - len(fixed_prefix)
111
if leading_trailing_underscores:
114
result = [choice(RANDOM_JOB_PREFIX_CHARS) for i in range(length)]
115
if leading_trailing_underscores:
116
return fixed_prefix + '_' + ''.join(result) + '_'
118
return fixed_prefix + ''.join(result)
120
def write_jobs_file(commands,job_prefix=None,jobs_fp=None):
121
""" Write commands to jobs_fp and return jobs_fp
127
jobs_fp = job_prefix + 'jobs.txt'
131
open(jobs_fp,'w').write('\n'.join(commands))
135
def submit_jobs(path_to_cluster_jobs, jobs_fp, job_prefix):
136
""" Submit the jobs to the queue using cluster_jobs.py
138
cmd = '%s -ms %s %s' % (path_to_cluster_jobs, jobs_fp, job_prefix)
139
stdout, stderr, return_value = qiime_system_call(cmd)
140
if return_value != 0:
141
msg = "\n\n*** Could not start parallel jobs. \n" +\
142
"Command run was:\n %s\n" % cmd +\
143
"Command returned exit status: %d\n" % return_value +\
144
"Stdout:\n%s\nStderr\n%s\n" % (stdout,stderr)
145
raise RuntimeError, msg
147
# Leave this comments in as they're useful for debugging.
148
# print 'Return value: %d\n' % return_value
149
# print 'STDOUT: %s\n' % stdout
150
# print 'STDERR: %s\n' % stderr
152
def compute_seqs_per_file(input_fasta_fp,num_jobs_to_start):
153
""" Compute the number of sequences to include in each split file
155
# count the number of sequences in the fasta file
157
int(popen("sed -n -e '/^>/p' %s | wc -l" % input_fasta_fp).read().strip())
123
# Split the input file into the individual job input files. Add the
124
# individual job files to the files_to_remove list
125
input_fps, remove_input_on_completion = self._input_splitter(
131
if remove_input_on_completion:
132
self.files_to_remove += input_fps
134
# Perform any method-specific setup (e.g., formatting a BLAST database)
135
self._precommand_initiation(input_fp,output_dir,working_dir,params)
137
# Generate the list of commands to be pushed out to workers
138
# and the list of output files generated by each job.
139
commands, job_result_filepaths = self._get_job_commands(input_fps,
144
self.files_to_remove += \
145
self._identify_files_to_remove(job_result_filepaths,params)
147
# Generate the output clean-up files
148
merge_map_filepath, deletion_list_filepath, expected_files_filepath =\
149
self._initialize_output_cleanup_files(job_result_filepaths,
155
# Set up poller apparatus if the user does not suppress polling
156
if not self._suppress_polling:
157
poller_command = self._initiate_polling(job_result_filepaths,
161
deletion_list_filepath,
162
expected_files_filepath)
164
# If the poller should be run in the same way as the other commands
165
# (rather than by the current process), add it to the list of commands
166
if not poll_directly:
167
commands.append(poller_command)
159
# divide the number of sequences by the number of jobs to start
160
result = num_input_seqs/num_jobs_to_start
162
# if we don't have a perfect split, round up
166
# return the result as an integer
169
def build_filepaths_from_filepaths(filepaths,prefix='',directory='',\
170
suffix='',replacement=('','')):
171
""" Returns a modified list of filepaths
173
Modifications to each filepath, in order, are:
174
1- strip path to convert filepath to filename
175
(e.g., mydir/out.txt => out.txt)
176
2- replace all occurrences of replacement[0] with replacement[1]
177
3- join directory, prefix, filename, suffix
179
Order of results corresponds to order of input.
182
if directory and not directory.endswith('/'):
186
replace_from = replacement[0]
187
replace_to = replacement[1]
190
file_dir, filename = split(fp)
191
filename = filename.replace(replace_from,replace_to)
192
result = '%s%s%s%s' % (directory,prefix,filename,suffix)
193
results.append(result)
198
def get_poller_command(python_exe_fp,poller_fp,expected_files_filepath,\
199
merge_map_filepath,deletion_list_filepath,seconds_to_sleep,\
200
command_prefix='/bin/bash; ',command_suffix='; exit'):
201
"""Generate command to initiate a poller to monitior/process completed runs
204
result = '%s %s %s -f %s -m %s -d %s -t %d %s' % \
208
expected_files_filepath,
210
deletion_list_filepath,
169
# Build the filepath for the 'jobs script'. Add that file to the
170
# files_to_remove list.
171
jobs_fp = join(working_dir,job_prefix + 'jobs.txt')
172
self._write_jobs_file(commands,jobs_fp)
173
self.files_to_remove.append(jobs_fp)
175
# submit the jobs file using cluster_jobs, if not suppressed by the
177
if not suppress_submit_jobs:
178
stdout, stderr, return_value = self._submit_jobs(
179
jobs_fp=jobs_fp, job_prefix=job_prefix)
181
# If the poller is going to be run by the current process,
184
# IMPORTANT: the following line MUST use qiime_system_call()
185
# instead of subprocess.call, .check_call, or .check_output in case
186
# we are invoked in a child process with PIPEs (a deadlock will
187
# occur otherwise). This can happen if this code is tested by
188
# all_tests.py, for example.
189
stdout, stderr, return_value = qiime_system_call(poller_command)
190
if return_value != 0:
191
print '**Error occuring when calling the poller directly. '+\
192
'Jobs may have been submitted, but are not being polled.'
196
self.files_to_remove = []
198
# Perform any method-specific cleanup. This should prevent the need to
200
self._call_cleanup(input_fp,
205
suppress_submit_jobs)
207
def _initialize_output_cleanup_files(self,
208
job_result_filepaths,
213
# Write the mapping file which described how the output files from
214
# each job should be merged into the final output files
215
merge_map_filepath = '%s/merge_map.txt' % working_dir
216
self._write_merge_map_file(input_file_basename,
217
job_result_filepaths,
221
self.files_to_remove.append(merge_map_filepath)
223
# Create the filepath listing the temporary files to be deleted,
224
# but don't write it yet
225
deletion_list_filepath = '%s/deletion_list.txt' % working_dir
226
self.files_to_remove.append(deletion_list_filepath)
228
# Write the list of files which must exist for the jobs to be
229
# considered complete
230
expected_files_filepath = '%s/expected_out_files.txt' % working_dir
231
self._write_filepaths_to_file(job_result_filepaths,
232
expected_files_filepath)
233
self.files_to_remove.append(expected_files_filepath)
235
return (merge_map_filepath,
236
deletion_list_filepath,
237
expected_files_filepath)
239
def _submit_jobs(self,
242
""" Submit the jobs to the queue using cluster_jobs.py
244
cmd = '%s -ms %s %s' % (self._cluster_jobs_fp,
247
stdout, stderr, return_value = qiime_system_call(cmd)
248
if return_value != 0:
249
msg = "\n\n*** Could not start parallel jobs. \n" +\
250
"Command run was:\n %s\n" % cmd +\
251
"Command returned exit status: %d\n" % return_value +\
252
"Stdout:\n%s\nStderr\n%s\n" % (stdout,stderr)
253
raise RuntimeError, msg
255
# Leave this comments in as they're useful for debugging.
256
# print 'Return value: %d\n' % return_value
257
# print 'STDOUT: %s\n' % stdout
258
# print 'STDERR: %s\n' % stderr
260
return stdout, stderr, return_value
262
def _identify_files_to_remove(self,job_result_filepaths,params):
263
""" Select the files to remove: by default remove all files
265
return job_result_filepaths
267
def _precommand_initiation(self,input_fp,output_dir,working_dir,params):
270
def _write_merge_map_file(self,
272
job_result_filepaths,
276
""" Create an empty file by default. Most subclasses will overwrite this
278
open(merge_map_filepath,'w').close()
280
def _get_random_job_prefix(self,
282
max_job_prefix_len=10,\
283
leading_trailing_underscores=True):
284
""" Return a string to use as job prefix """
286
length = max_job_prefix_len - len(fixed_prefix)
287
if leading_trailing_underscores:
290
result = [choice(RANDOM_JOB_PREFIX_CHARS) for i in range(length)]
291
if leading_trailing_underscores:
292
return fixed_prefix + '_' + ''.join(result) + '_'
294
return fixed_prefix + ''.join(result)
296
def _get_job_commands(self,
302
command_prefix='/bin/bash; ',
303
command_suffix='; exit'):
304
raise NotImplementedError, "Subclass must override _get_jobs_commands"
306
def _initiate_polling(self,
307
job_result_filepaths,
311
deletion_list_filepath,
312
expected_files_filepath):
313
# Generate the command to run the poller, and the list of temp files
314
# created by the poller
315
if not poll_directly:
316
poller_command, poller_result_filepaths =\
317
self._get_poller_command(expected_files_filepath,
319
deletion_list_filepath)
321
# this 'else' exists only because of the command_prefix/suffix bullshit,
322
# which needs to be refactored (related to trac #109)
323
poller_command, poller_result_filepaths =\
324
self._get_poller_command(expected_files_filepath,
326
deletion_list_filepath,
330
self.files_to_remove += poller_result_filepaths
332
if not self._retain_temp_files:
333
# If the user wants temp files deleted, now write the list of
334
# temp files to be deleted
335
self._write_filepaths_to_file(self.files_to_remove,
336
deletion_list_filepath)
338
# Otherwise just write an empty file
339
self._write_filepaths_to_file([],
340
deletion_list_filepath)
342
return poller_command
344
def _get_poller_command(self,
345
expected_files_filepath,
347
deletion_list_filepath,
348
command_prefix='/bin/bash; ',
349
command_suffix='; exit'):
350
"""Generate command to initiate a poller to monitior/process completed runs
353
result = '%s poller.py -f %s -m %s -d %s -t %d %s' % \
355
expected_files_filepath,
357
deletion_list_filepath,
358
self._seconds_to_sleep,
216
def get_rename_command(out_filenames,tmp_output_dir,output_dir):
217
"""Generate commands to move out_filenames from tmp_output_dir to output_dir
363
def _write_filepaths_to_file(self,
364
job_result_filepaths,
365
expected_files_filepath):
366
f = open(expected_files_filepath,'w')
367
f.write('\n'.join(job_result_filepaths))
370
def _get_rename_command(self,
374
"""Generate commands to move out_filenames from tmp_output_dir to output_dir
377
result_filepaths = []
378
for fn in out_filenames:
379
tmp_result_filepath = '%s/%s' % (tmp_output_dir,fn)
380
result_filepath = '%s/%s' % (output_dir,fn)
382
'; mv %s %s' % (tmp_result_filepath,result_filepath)
383
result_filepaths.append(result_filepath)
384
return result, result_filepaths
386
def _write_jobs_file(self,
389
""" Write commands to jobs_fp and return jobs_fp """
390
open(jobs_fp,'w').write('\n'.join(commands))
393
def _merge_to_n_commands(self,
398
command_suffix=None):
399
""" merge a list of commands into n commands
401
This is used by parallel wrappers such as alpha_diversity and
402
beta_diversity which perform an operation on a collection of
403
input files (opposed to the scripts that split an input file
404
into the user-specified number of jobs).
408
raise ValueError, "number of commands (n) must be an integer >= 1"
410
commands_to_filter = []
411
if command_prefix == None:
412
command_prefix = '/bin/bash ;'
413
commands_to_filter.append('/bin/bash')
415
commands_to_filter += [c.strip() for c in command_prefix.split(';') if c.strip()]
417
if command_suffix == None:
418
command_suffix = '; exit'
419
commands_to_filter.append('exit')
421
commands_to_filter += [c.strip() for c in command_suffix.split(';') if c.strip()]
424
commands_per_merged_command = int(ceil((len(commands)/n)))
425
# begin iterating through the commands
428
for command in commands:
429
subcommands = [c.strip() for c in command.split(';')]
430
current_cmds.append(delimiter.join([s for s in subcommands if s not in commands_to_filter]))
433
if cmd_counter == commands_per_merged_command:
434
result.append(delimiter.join(current_cmds))
439
result[-1] = delimiter.join([result[-1]] + current_cmds)
441
for i,r in enumerate(result):
442
r = '%s %s %s' % (command_prefix, r, command_suffix)
443
result[i] = r.strip()
447
def _compute_seqs_per_file(self,
450
""" Compute the number of sequences to include in each split file
452
# count the number of sequences in the fasta file
453
num_input_seqs = count_seqs(input_fasta_fp)[0]
455
# divide the number of sequences by the number of jobs to start
456
result = num_input_seqs/num_jobs_to_start
458
# if we don't have a perfect split, round up
462
# return the result as an integer
466
# General purpose _input_splitter functions
468
def _split_fasta(self,
474
# compute the number of sequences that should be included in
475
# each file after splitting the input fasta file
476
num_seqs_per_file = self._compute_seqs_per_file(input_fp,jobs_to_start)
478
# split the fasta files and get the list of resulting files
480
split_fasta(open(input_fp),num_seqs_per_file,\
481
job_prefix,working_dir=output_dir)
483
return tmp_fasta_fps, True
485
def _input_existing_filepaths(self,
491
return input_fps, False
493
class BufferedWriter():
494
"""A file like object that delays writing to file without keeping an open filehandle
496
This class comes useful in scenarios were potentially many open fhs are needed
497
(e.g. during splitting of inputs for parallelization). Since
498
each OS limits the max number of open fh at any time, we provide a fh like class that
499
can be used much like a regular (writable) fh, but without keeping the fh open permanently.
500
Using a larger buffer size speeds up the program by using less of the expensive open/close
220
result_filepaths = []
221
for fn in out_filenames:
222
tmp_result_filepath = '%s/%s' % (tmp_output_dir,fn)
223
result_filepath = '%s/%s' % (output_dir,fn)
225
'; mv %s %s' % (tmp_result_filepath,result_filepath)
226
result_filepaths.append(result_filepath)
227
return result, result_filepaths
504
def __init__(self, filename, buf_size=100):
506
filename: name of file to write to in append mode
230
def write_filepaths_to_file(job_result_filepaths,expected_files_filepath):
231
f = open(expected_files_filepath,'w')
232
f.write('\n'.join(job_result_filepaths))
236
def write_merge_map_file_align_seqs(job_result_filepaths,output_dir,\
237
merge_map_filepath,input_file_basename):
239
f = open(merge_map_filepath,'w')
241
out_filepaths = ['%s/%s_aligned.fasta' % (output_dir,input_file_basename),
242
'%s/%s_failures.fasta' % (output_dir,input_file_basename),
243
'%s/%s_log.txt' % (output_dir,input_file_basename)]
249
for fp in job_result_filepaths:
250
if fp.endswith('_aligned.fasta'):
251
aligned_fps.append(fp)
252
elif fp.endswith('_failures.fasta'):
253
failures_fps.append(fp)
257
for in_files, out_file in\
258
zip([aligned_fps,failures_fps,log_fps],out_filepaths):
259
f.write('\t'.join(in_files + [out_file]))
263
def write_merge_map_file_pick_otus(job_result_filepaths,output_dir,\
264
merge_map_filepath,input_file_basename,failures=False):
266
f = open(merge_map_filepath,'w')
274
'%s/%s_otus.txt' % (output_dir,input_file_basename),
275
'%s/%s_otus.log' % (output_dir,input_file_basename)]
276
in_filepaths = [otus_fps,log_fps]
279
'%s/%s_otus.txt' % (output_dir,input_file_basename),
280
'%s/%s_otus.log' % (output_dir,input_file_basename),
281
'%s/%s_failures.txt' % (output_dir,input_file_basename)]
282
in_filepaths = [otus_fps,log_fps,failures_fps]
284
for fp in job_result_filepaths:
285
if fp.endswith('_otus.txt'):
287
elif fp.endswith('_otus.log'):
289
elif fp.endswith('_failures.txt'):
290
failures_fps.append(fp)
294
for in_files, out_file in\
295
zip(in_filepaths,out_filepaths):
296
f.write('\t'.join(in_files + [out_file]))
301
def write_merge_map_file_assign_taxonomy(job_result_filepaths,output_dir,\
302
merge_map_filepath,input_file_basename):
304
f = open(merge_map_filepath,'w')
307
'%s/%s_tax_assignments.txt' % (output_dir,input_file_basename),
308
'%s/%s_tax_assignments.log' % (output_dir,input_file_basename)]
313
for fp in job_result_filepaths:
314
if fp.endswith('_tax_assignments.txt'):
315
assignment_fps.append(fp)
319
for in_files, out_file in\
320
zip([assignment_fps,log_fps],out_filepaths):
321
f.write('\t'.join(in_files + [out_file]))
325
def write_merge_map_file_blast(job_result_filepaths,output_dir,\
326
merge_map_filepath,input_file_basename):
328
f = open(merge_map_filepath,'w')
330
'%s/%s_blast_out.txt' % (output_dir,input_file_basename)
331
f.write('\t'.join(job_result_filepaths + [out_filepath]))
335
def write_merge_map_file_identify_chimeric_seqs(job_result_filepaths,output_dir,\
336
merge_map_filepath,input_file_basename, out_fp=None):
338
f = open(merge_map_filepath,'w')
341
out_filepaths = [out_fp]
344
'%s/%s_chimeric.txt' % (output_dir,input_file_basename)]
347
logs_fps = [] #logs_fp currently not used
349
for fp in job_result_filepaths:
350
if fp.endswith('_chimeric.txt'):
355
for in_files, out_file in\
356
zip([chims_fps],out_filepaths):
357
f.write('\t'.join(in_files + [out_file]))
508
buf_size: buffer size in chunks. Each write operations counts as one chunk.
512
raise ValueError("Invalid buf_size. Must be 1 or larger.")
515
self.buf_size = buf_size
516
self.filename = filename
519
fh = open(self.filename, "w")
528
def write(self, line):
529
"""write line to BufferedWriter"""
531
self.buffer.append(line)
532
if (len(self.buffer) > self.buf_size):
536
"""Write buffer to file"""
538
fh = open(self.filename, "a")
539
fh.write("".join(self.buffer))