5
5
__author__ = "Greg Caporaso"
6
6
__copyright__ = "Copyright 2011, The QIIME Project"
7
__credits__ = ["Greg Caporaso","Dan Knights"]
7
__credits__ = ["Greg Caporaso","Dan Knights", "Jose Antonio Navas Molina"]
10
10
__maintainer__ = "Greg Caporaso"
11
11
__email__ = "gregcaporaso@gmail.com"
12
12
__status__ = "Release"
15
from qiime.util import make_option
16
from qiime.util import parse_command_line_parameters,\
17
load_qiime_config, get_options_lookup, get_qiime_scripts_dir
18
from qiime.parallel.pick_otus_blast import get_job_commands,\
20
from qiime.parallel.util import split_fasta, get_random_job_prefix,\
21
write_jobs_file,submit_jobs, compute_seqs_per_file,\
22
build_filepaths_from_filepaths, write_filepaths_to_file,\
23
write_merge_map_file_pick_otus
24
from os import popen, system, makedirs, mkdir
25
from os.path import split, splitext, join
26
from subprocess import check_call, CalledProcessError
27
from qiime.util import get_tmp_filename
28
from cogent.app.formatdb import build_blast_db_from_fasta_path
15
from qiime.util import (parse_command_line_parameters,
18
from qiime.parallel.pick_otus import ParallelPickOtusBlast
30
qiime_config = load_qiime_config()
31
20
options_lookup = get_options_lookup()
59
48
'template alignment [default: %default]'),\
61
50
make_option('-b','--blast_db',action='store',\
62
type='string',help='database to blast against '+\
51
type='blast_db',help='database to blast against '+\
63
52
'[default: %default]'),\
65
54
make_option('--min_aligned_percent',
66
55
help=('Minimum percent of query sequence that can be aligned '
67
56
'to consider a hit (BLAST OTU picker only) [default: %default]'),
68
57
default=0.50,type='float'),
70
#Define parallel-script-specific parameters
71
make_option('-N','--pick_otus_fp',action='store',\
72
type='existing_filepath',help='full path to '+\
73
'scripts/pick_otus.py [default: %default]',\
74
default=join(get_qiime_scripts_dir(),'pick_otus.py')),\
76
options_lookup['jobs_to_start'],\
77
options_lookup['poller_fp'],\
78
options_lookup['retain_temp_files'],\
79
options_lookup['suppress_submit_jobs'],\
80
options_lookup['poll_directly'],\
81
options_lookup['cluster_jobs_fp'],\
82
options_lookup['suppress_polling'],\
83
options_lookup['job_prefix'],\
84
options_lookup['python_exe_fp'],\
85
options_lookup['seconds_to_sleep']\
59
options_lookup['jobs_to_start'],
60
options_lookup['retain_temp_files'],
61
options_lookup['suppress_submit_jobs'],
62
options_lookup['poll_directly'],
63
options_lookup['cluster_jobs_fp'],
64
options_lookup['suppress_polling'],
65
options_lookup['job_prefix'],
66
options_lookup['seconds_to_sleep']
88
69
script_info['version'] = __version__
93
74
if opts.blast_db == None and opts.refseqs_fp == None:
94
75
option_parser.error('Either blast_db or refseqs_fp must be provided.')
96
# create local copies of command-line options
97
python_exe_fp = opts.python_exe_fp
98
pick_otus_fp = opts.pick_otus_fp
99
refseqs_fp = opts.refseqs_fp
100
cluster_jobs_fp = opts.cluster_jobs_fp
101
input_fasta_fp = opts.input_fasta_fp
102
jobs_to_start = opts.jobs_to_start
103
output_dir = opts.output_dir
104
poller_fp = opts.poller_fp
105
retain_temp_files = opts.retain_temp_files
106
suppress_polling = opts.suppress_polling
107
seconds_to_sleep = opts.seconds_to_sleep
108
max_e_value = opts.max_e_value
109
similarity = opts.similarity
110
poll_directly = opts.poll_directly
111
min_aligned_percent = opts.min_aligned_percent
113
created_temp_paths = []
115
if not opts.blast_db:
116
# Build the blast database from the reference_seqs_fp -- all procs
117
# will then access one db rather than create one per proc
118
blast_db, db_files_to_remove = \
119
build_blast_db_from_fasta_path(refseqs_fp)
120
created_temp_paths += db_files_to_remove
122
blast_db = opts.blast_db
124
# split the input filepath into directory and filename, base filename and
126
input_dir, input_fasta_fn = split(input_fasta_fp)
127
input_file_basename, input_fasta_ext = splitext(input_fasta_fn)
129
# set the job_prefix either based on what the user passed in,
130
# or a random string beginning with RDP
131
job_prefix = opts.job_prefix or get_random_job_prefix('POTU')
133
# A temporary output directory is created in output_dir named
134
# job_prefix. Output files are then moved from the temporary
135
# directory to the output directory when they are complete, allowing
136
# a poller to detect when runs complete by the presence of their
138
working_dir = '%s/%s' % (output_dir,job_prefix)
140
makedirs(working_dir)
141
created_temp_paths.append(working_dir)
143
# working dir already exists
146
# compute the number of sequences that should be included in
147
# each file after splitting the input fasta file
148
num_seqs_per_file = compute_seqs_per_file(input_fasta_fp,jobs_to_start)
150
# split the fasta files and get the list of resulting files
152
split_fasta(open(input_fasta_fp),num_seqs_per_file,\
153
job_prefix,working_dir=output_dir)
154
created_temp_paths += tmp_fasta_fps
156
# build the filepath for the 'jobs script'
157
jobs_fp = '%s/%sjobs.txt' % (output_dir, job_prefix)
158
created_temp_paths.append(jobs_fp)
160
# generate the list of commands to be pushed out to nodes and the list of
161
# output files generated by each job
162
commands, job_result_filepaths = \
163
get_job_commands(python_exe_fp,pick_otus_fp,tmp_fasta_fps,
164
output_dir,blast_db,job_prefix,working_dir,max_e_value,similarity,
166
created_temp_paths += job_result_filepaths
168
# Set up poller apparatus if the user does not suppress polling
169
if not suppress_polling:
170
# Write the list of files which must exist for the jobs to be
171
# considered complete
172
expected_files_filepath = '%s/expected_out_files.txt' % working_dir
173
write_filepaths_to_file(job_result_filepaths,expected_files_filepath)
174
created_temp_paths.append(expected_files_filepath)
176
# Write the mapping file which described how the output files from
177
# each job should be merged into the final output files
178
merge_map_filepath = '%s/merge_map.txt' % working_dir
179
process_run_results_f =\
180
'qiime.parallel.pick_otus_blast.parallel_blast_process_run_results_f'
181
write_merge_map_file_pick_otus(job_result_filepaths,output_dir,\
182
merge_map_filepath,input_file_basename)
183
created_temp_paths.append(merge_map_filepath)
185
# Create the filepath listing the temporary files to be deleted,
186
# but don't write it yet
187
deletion_list_filepath = '%s/deletion_list.txt' % working_dir
188
created_temp_paths.append(deletion_list_filepath)
190
# Generate the command to run the poller, and the list of temp files
191
# created by the poller
192
if not poll_directly:
193
poller_command, poller_result_filepaths =\
194
get_poller_command(python_exe_fp,poller_fp,expected_files_filepath,\
195
merge_map_filepath,deletion_list_filepath,process_run_results_f,\
196
seconds_to_sleep=seconds_to_sleep)
197
created_temp_paths += poller_result_filepaths
198
# append the poller command to the list of job commands
199
commands.append(poller_command)
201
poller_command, poller_result_filepaths =\
202
get_poller_command(python_exe_fp,poller_fp,expected_files_filepath,\
203
merge_map_filepath,deletion_list_filepath,process_run_results_f,\
204
seconds_to_sleep=seconds_to_sleep,command_prefix='',command_suffix='')
205
created_temp_paths += poller_result_filepaths
207
if not retain_temp_files:
208
# If the user wants temp files deleted, now write the list of
209
# temp files to be deleted
210
write_filepaths_to_file(created_temp_paths,deletion_list_filepath)
212
# Otherwise just write an empty file
213
write_filepaths_to_file([],deletion_list_filepath)
215
# write the commands to the 'jobs files'
216
write_jobs_file(commands,job_prefix=job_prefix,jobs_fp=jobs_fp)
218
# submit the jobs file using cluster_jobs, if not suppressed by the
220
if not opts.suppress_submit_jobs:
221
submit_jobs(cluster_jobs_fp,jobs_fp,job_prefix)
225
check_call(poller_command.split())
226
except CalledProcessError, e:
227
print '**Error occuring when calling the poller directly. '+\
228
'Jobs may have been submitted, but are not being polled.'
77
# create dict of command-line options
78
params = eval(str(opts))
80
parallel_runner = ParallelPickOtusBlast(
81
cluster_jobs_fp=opts.cluster_jobs_fp,
82
jobs_to_start=opts.jobs_to_start,
83
retain_temp_files=opts.retain_temp_files,
84
suppress_polling=opts.suppress_polling,
85
seconds_to_sleep=opts.seconds_to_sleep)
86
parallel_runner(opts.input_fasta_fp,
89
job_prefix=opts.job_prefix,
90
poll_directly=opts.poll_directly,
91
suppress_submit_jobs=False)