1
1
#!/usr/bin/env python
2
# Author: Jens Reeder (jens.reeder@gmail.com)
3
# identify_chimeric_seqs.py
5
2
from __future__ import division
6
from qiime.parallel.poller import basic_process_run_results_f
7
from qiime.parallel.util import get_rename_command
9
__author__ = "Greg Caporaso"
10
__copyright__ = "Copyright 2011, The QIIME Project"
11
__credits__ = ["Greg Caporaso","Jens Reeder"]
4
__author__ = "Jai Ram Rideout"
5
__copyright__ = "Copyright 2012, The QIIME project"
6
__credits__ = ["Jai Ram Rideout"]
14
__maintainer__ = "Greg Caporaso"
15
__email__ = "gregcaporaso@gmail.com"
9
__maintainer__ = "Jai Ram Rideout"
10
__email__ = "jai.rideout@gmail.com"
16
11
__status__ = "Release"
18
def get_job_commands(python_exe_fp, identify_chimeric_seqs_fp, fasta_fps,
19
output_dir, ref_seqs_fp, job_prefix, working_dir,
20
aligned_reference_seqs_fp, blast_db,
21
chimera_detection_method, min_div_ratio, num_fragments,
22
taxonomy_depth, max_e_value, id_to_taxonomy_fp,
23
command_prefix='', command_suffix=''):
24
# command_prefix='/bin/bash; ', command_suffix='; exit'):
25
"""Generate identify_chimeric_seqs commands which should be run
27
# Create basenames for each of the output files. These will be filled
28
# in to create the full list of files created by all of the runs.
29
out_filenames = [job_prefix + '.%d_chimeric.txt']
31
# Create lists to store the results
35
# Iterate over the input files
36
for i,fasta_fp in enumerate(fasta_fps):
37
# Each run ends with moving the output file from the tmp dir to
38
# the output_dir. Build the command to perform the move here.
39
rename_command, current_result_filepaths = get_rename_command(\
40
[fn % i for fn in out_filenames], working_dir, output_dir)
41
result_filepaths += current_result_filepaths
46
if chimera_detection_method=='blast_fragments':
49
optional_options += " -r %s" % ref_seqs_fp
51
optional_options += " -b %s" % blast_db
54
'%s %s %s -i %s -t %s -m blast_fragments -o %s -n %s -d %s -e %s %s %s %s' %\
57
identify_chimeric_seqs_fp,
60
working_dir+"/"+out_filenames[0] % i,
68
elif chimera_detection_method=='ChimeraSlayer':
70
optional_options += " --min_div_ratio %s" % min_div_ratio
72
optional_options += " -r %s" % ref_seqs_fp
74
'%s %s %s -i %s -a %s -m ChimeraSlayer -o %s %s %s %s' %\
77
identify_chimeric_seqs_fp,
79
aligned_reference_seqs_fp,
80
working_dir+"/"+out_filenames[0] % i,
13
from os.path import split
14
from shutil import copy
15
from cogent.app.formatdb import build_blast_db_from_fasta_path
16
from cogent.parse.fasta import MinimalFastaParser
17
from qiime.identify_chimeric_seqs import make_cidx_file
18
from qiime.parse import parse_tmp_to_final_filepath_map_file
19
from qiime.util import write_degapped_fasta_to_file
21
from qiime.parallel.util import ParallelWrapper
23
class ParallelChimericSequenceIdentifier(ParallelWrapper):
24
_script_name = 'identify_chimeric_seqs.py'
25
_input_splitter = ParallelWrapper._split_fasta
27
_process_run_results_f = \
28
'qiime.parallel.identify_chimeric_seqs.basic_process_run_results_f'
30
def _precommand_initiation(self, input_fp, output_dir, working_dir,
32
if params['chimera_detection_method'] == 'blast_fragments':
33
blast_db, db_files_to_remove = \
34
build_blast_db_from_fasta_path(params['reference_seqs_fp'],
35
output_dir=working_dir)
36
self.files_to_remove += db_files_to_remove
37
params['blast_db'] = blast_db
38
elif params['chimera_detection_method'] == 'ChimeraSlayer':
39
#copy the reference files to working dir
40
#ChimeraSlayer creates an index file of the ref and
41
#will crash without write permission in the ref seqs dir
42
aligned_reference_seqs_fp = params['aligned_reference_seqs_fp']
43
_, new_ref_filename = split(aligned_reference_seqs_fp)
44
copy(aligned_reference_seqs_fp, working_dir)
45
aligned_reference_seqs_fp = working_dir + "/" + new_ref_filename
47
self.files_to_remove.append(aligned_reference_seqs_fp)
48
params['aligned_reference_seqs_fp'] = aligned_reference_seqs_fp
50
#if given, also copy the unaligned ref db
51
reference_seqs_fp = params['reference_seqs_fp']
53
_, new_ref_filename = split(reference_seqs_fp)
54
copy(reference_seqs_fp, working_dir)
55
reference_seqs_fp = working_dir + "/" + new_ref_filename
58
reference_seqs_fp = write_degapped_fasta_to_file(
59
MinimalFastaParser(open(aligned_reference_seqs_fp)),
62
self.files_to_remove.append(reference_seqs_fp)
63
params['reference_seqs_fp'] = reference_seqs_fp
65
#build blast db of reference, otherwise ChimeraSlayer will do it
66
#and parallel jobs clash
67
_, db_files_to_remove = \
68
build_blast_db_from_fasta_path(reference_seqs_fp)
69
self.files_to_remove += db_files_to_remove
71
#make the index file globally
72
#Reason: ChimeraSlayer first checks to see if the index file is
73
#there. If not it tries to create it. This can lead to race
74
#condition if several parallel jobs try to create it at the same
76
make_cidx_file(aligned_reference_seqs_fp)
77
self.files_to_remove.append(aligned_reference_seqs_fp + ".cidx")
85
raise NotImplementedError
86
commands.append(command)
88
return commands, result_filepaths
90
def get_poller_command(python_exe_fp,poller_fp,expected_files_filepath,\
91
merge_map_filepath,deletion_list_filepath,process_run_results_f,\
92
seconds_to_sleep,command_prefix='/bin/bash; ',command_suffix='; exit'):
93
"""Generate command to initiate a poller to monitior/process completed runs
79
raise ValueError("Unrecognized chimera detection method '%s'." %
80
params['chimera_detection_method'])
82
def _get_job_commands(self, fasta_fps, output_dir, params, job_prefix,
83
working_dir, command_prefix='/bin/bash; ',
84
command_suffix='; exit'):
85
"""Generate identify_chimeric_seqs.py commands which should be run."""
86
# Create basenames for each of the output files. These will be filled
87
# in to create the full list of files created by all of the runs.
88
out_filenames = [job_prefix + '.%d_chimeric.txt']
90
# Create lists to store the results.
94
# Iterate over the input files.
95
for i, fasta_fp in enumerate(fasta_fps):
96
# Each run ends with moving the output file from the tmp dir to
97
# the output_dir. Build the command to perform the move here.
98
rename_command, current_result_filepaths = \
99
self._get_rename_command([fn % i for fn in out_filenames],
100
working_dir, output_dir)
101
result_filepaths += current_result_filepaths
103
optional_options = ""
104
if params['chimera_detection_method'] == 'blast_fragments':
106
'%s %s -i %s -t %s -m blast_fragments -o %s -n %s -d %s -e %s -b %s %s %s' % \
110
params['id_to_taxonomy_fp'],
111
working_dir + "/" + out_filenames[0] % i,
112
params['num_fragments'],
113
params['taxonomy_depth'],
114
params['max_e_value'],
118
elif params['chimera_detection_method'] == 'ChimeraSlayer':
119
optional_options = ""
120
if params['min_div_ratio']:
121
optional_options += " --min_div_ratio %s" % \
122
params['min_div_ratio']
123
if params['reference_seqs_fp']:
124
optional_options += " -r %s" % params['reference_seqs_fp']
127
'%s %s -i %s -a %s -m ChimeraSlayer -o %s %s %s %s' % \
131
params['aligned_reference_seqs_fp'],
132
working_dir + "/" + out_filenames[0] % i,
137
raise NotImplementedError
138
commands.append(command)
139
return commands, result_filepaths
141
def _get_poller_command(self,
142
expected_files_filepath,
144
deletion_list_filepath,
145
command_prefix='/bin/bash; ',
146
command_suffix='; exit'):
147
"""Generate command to initiate a poller to monitior/process completed runs
149
result = '%s poller.py -f %s -p %s -m %s -d %s -t %d %s' % \
151
expected_files_filepath,
152
self._process_run_results_f,
154
deletion_list_filepath,
155
self._seconds_to_sleep,
159
def _write_merge_map_file(self, input_file_basename, job_result_filepaths,
160
params, output_dir, merge_map_filepath,
162
f = open(merge_map_filepath,'w')
163
out_filepaths = [params['output_fp']]
166
logs_fps = [] #logs_fp currently not used
168
for fp in job_result_filepaths:
169
if fp.endswith('_chimeric.txt'):
174
for in_files, out_file in zip([chims_fps], out_filepaths):
175
f.write('\t'.join(in_files + [out_file]))
180
def basic_process_run_results_f(f):
181
""" Copy each list of infiles to each outfile and delete infiles
183
f: file containing one set of mapping instructions per line
186
f1.txt f2.txt f3.txt f_combined.txt
187
f1.log f2.log f3.log f_combined.log
189
If f contained the two lines above, this function would
190
concatenate f1.txt, f2.txt, and f3.txt into f_combined.txt
191
and f1.log, f2.log, and f3.log into f_combined.log
96
result = '%s %s %s -f %s -p %s -m %s -d %s -t %d %s' % \
100
expected_files_filepath,
101
process_run_results_f,
103
deletion_list_filepath,
193
infiles_lists,out_filepaths = parse_tmp_to_final_filepath_map_file(f)
194
for infiles_list, out_filepath in zip(infiles_lists,out_filepaths):
196
of = open(out_filepath,'w')
199
"Poller can't open final output file: %s" % out_filepath +\
200
"\nLeaving individual jobs output.\n Do you have write access?"
202
for fp in infiles_list:
203
for line in open(fp):
204
of.write('%s\n' % line.strip('\n'))
206
# It is a good idea to have your clean_up_callback return True.
207
# That way, if you get mixed up and pass it as check_run_complete_callback,
208
# you'll get an error right away rather than going into an infinite loop