2
from __future__ import division
4
__author__ = "Jai Ram Rideout"
5
__copyright__ = "Copyright 2012, The QIIME project"
6
__credits__ = ["Jai Ram Rideout"]
9
__maintainer__ = "Jai Ram Rideout"
10
__email__ = "jai.rideout@gmail.com"
11
__status__ = "Release"
13
from cogent.app.formatdb import build_blast_db_from_fasta_path
14
from qiime.parallel.util import ParallelWrapper
16
class ParallelTaxonomyAssigner(ParallelWrapper):
17
_script_name = 'assign_taxonomy.py'
18
_input_splitter = ParallelWrapper._split_fasta
20
def _build_job_commands(self, tax_specific_param_str, fasta_fps,
21
output_dir, params, job_prefix, working_dir,
22
command_prefix='/bin/bash; ',
23
command_suffix='; exit'):
24
"""Generate assign_taxonomy.py commands which should be run."""
25
# Create basenames for each of the output files. These will be filled
26
# in to create the full list of files created by all of the runs.
27
out_filenames = [job_prefix + '.%d_tax_assignments.log',
28
job_prefix + '.%d_tax_assignments.txt']
30
# Create lists to store the results.
34
# Iterate over the input files.
35
for i, fasta_fp in enumerate(fasta_fps):
36
# Each run ends with moving the output file from the tmp dir to
37
# the output_dir. Build the command to perform the move here.
38
rename_command, current_result_filepaths = \
39
self._get_rename_command([fn % i for fn in out_filenames],
40
working_dir, output_dir)
41
result_filepaths += current_result_filepaths
43
command = '%s %s %s -o %s -i %s %s %s' %\
46
tax_specific_param_str,
51
commands.append(command)
52
return commands, result_filepaths
54
def _write_merge_map_file(self,
63
f = open(merge_map_filepath,'w')
66
'%s/%s_tax_assignments.txt' % (output_dir,input_file_basename),
67
'%s/%s_tax_assignments.log' % (output_dir,input_file_basename)]
72
for fp in job_result_filepaths:
73
if fp.endswith('_tax_assignments.txt'):
74
assignment_fps.append(fp)
78
for in_files, out_file in\
79
zip([assignment_fps,log_fps],out_filepaths):
80
f.write('\t'.join(in_files + [out_file]))
85
class ParallelRdpTaxonomyAssigner(ParallelTaxonomyAssigner):
88
def _get_job_commands(self, fasta_fps, output_dir, params, job_prefix,
89
working_dir, command_prefix=None,
90
command_suffix='; exit'):
91
command_prefix = command_prefix or \
92
'/bin/bash; export RDP_JAR_PATH=%s; ' % params['rdp_classifier_fp']
94
rdp_params = '-m rdp -c %1.2f --rdp_max_memory %d ' % (
95
params['confidence'], params['rdp_max_memory'])
96
if params['id_to_taxonomy_fp'] and params['reference_seqs_fp']:
97
rdp_params += '-t %s -r %s' % (params['id_to_taxonomy_fp'],
98
params['reference_seqs_fp'])
100
return self._build_job_commands(rdp_params, fasta_fps, output_dir,
101
params, job_prefix, working_dir, command_prefix,
105
class ParallelBlastTaxonomyAssigner(ParallelTaxonomyAssigner):
108
def _precommand_initiation(self, input_fp, output_dir, working_dir, params):
109
if not params['blast_db']:
110
# Build the blast database from the reference_seqs_fp -- all procs
111
# will then access one db rather than create one per proc.
112
blast_db, db_files_to_remove = \
113
build_blast_db_from_fasta_path(params['reference_seqs_fp'])
114
self.files_to_remove += db_files_to_remove
115
params['blast_db'] = blast_db
117
def _get_job_commands(self, fasta_fps, output_dir, params, job_prefix,
118
working_dir, command_prefix=None,
119
command_suffix='; exit'):
120
command_prefix = command_prefix or \
121
'/bin/bash; cd %s; export BLASTMAT=%s;' % (working_dir,
122
params['blastmat_dir'])
124
blast_params = '-m blast -e %s -b %s -t %s ' % (
125
params['e_value'], params['blast_db'],
126
params['id_to_taxonomy_fp'])
128
return self._build_job_commands(blast_params, fasta_fps, output_dir,
129
params, job_prefix, working_dir, command_prefix,