~ubuntu-branches/ubuntu/trusty/qiime/trusty

« back to all changes in this revision

Viewing changes to scripts/parallel_pick_otus_blast.py

  • Committer: Package Import Robot
  • Author(s): Andreas Tille
  • Date: 2013-06-17 18:28:26 UTC
  • mfrom: (9.1.2 sid)
  • Revision ID: package-import@ubuntu.com-20130617182826-376az5ad080a0sfe
Tags: 1.7.0+dfsg-1
Upload preparations done for BioLinux to Debian

Show diffs side-by-side

added added

removed removed

Lines of Context:
4
4
 
5
5
__author__ = "Greg Caporaso"
6
6
__copyright__ = "Copyright 2011, The QIIME Project"
7
 
__credits__ = ["Greg Caporaso","Dan Knights"]
 
7
__credits__ = ["Greg Caporaso","Dan Knights", "Jose Antonio Navas Molina"]
8
8
__license__ = "GPL"
9
 
__version__ = "1.5.0"
 
9
__version__ = "1.7.0"
10
10
__maintainer__ = "Greg Caporaso"
11
11
__email__ = "gregcaporaso@gmail.com"
12
12
__status__ = "Release"
13
13
 
14
14
 
15
 
from qiime.util import make_option
16
 
from qiime.util import parse_command_line_parameters,\
17
 
    load_qiime_config, get_options_lookup, get_qiime_scripts_dir
18
 
from qiime.parallel.pick_otus_blast import get_job_commands,\
19
 
    get_poller_command
20
 
from qiime.parallel.util import split_fasta, get_random_job_prefix,\
21
 
    write_jobs_file,submit_jobs, compute_seqs_per_file,\
22
 
    build_filepaths_from_filepaths, write_filepaths_to_file,\
23
 
    write_merge_map_file_pick_otus
24
 
from os import popen, system, makedirs, mkdir
25
 
from os.path import split, splitext, join
26
 
from subprocess import check_call, CalledProcessError
27
 
from qiime.util import get_tmp_filename
28
 
from cogent.app.formatdb import build_blast_db_from_fasta_path
 
15
from qiime.util import (parse_command_line_parameters,
 
16
                        get_options_lookup,
 
17
                        make_option)
 
18
from qiime.parallel.pick_otus import ParallelPickOtusBlast
29
19
 
30
 
qiime_config = load_qiime_config()
31
20
options_lookup = get_options_lookup()
32
21
 
33
22
script_info={}
59
48
           'template alignment [default: %default]'),\
60
49
    
61
50
    make_option('-b','--blast_db',action='store',\
62
 
           type='string',help='database to blast against '+\
 
51
           type='blast_db',help='database to blast against '+\
63
52
           '[default: %default]'),\
64
53
           
65
54
    make_option('--min_aligned_percent',
66
55
        help=('Minimum percent of query sequence that can be aligned '
67
56
              'to consider a hit (BLAST OTU picker only) [default: %default]'),
68
57
              default=0.50,type='float'),
69
 
          
70
 
    #Define parallel-script-specific parameters
71
 
    make_option('-N','--pick_otus_fp',action='store',\
72
 
           type='existing_filepath',help='full path to '+\
73
 
           'scripts/pick_otus.py [default: %default]',\
74
 
           default=join(get_qiime_scripts_dir(),'pick_otus.py')),\
75
58
        
76
 
 options_lookup['jobs_to_start'],\
77
 
 options_lookup['poller_fp'],\
78
 
 options_lookup['retain_temp_files'],\
79
 
 options_lookup['suppress_submit_jobs'],\
80
 
 options_lookup['poll_directly'],\
81
 
 options_lookup['cluster_jobs_fp'],\
82
 
 options_lookup['suppress_polling'],\
83
 
 options_lookup['job_prefix'],\
84
 
 options_lookup['python_exe_fp'],\
85
 
 options_lookup['seconds_to_sleep']\
 
59
 options_lookup['jobs_to_start'],
 
60
 options_lookup['retain_temp_files'],
 
61
 options_lookup['suppress_submit_jobs'],
 
62
 options_lookup['poll_directly'],
 
63
 options_lookup['cluster_jobs_fp'],
 
64
 options_lookup['suppress_polling'],
 
65
 options_lookup['job_prefix'],
 
66
 options_lookup['seconds_to_sleep']
86
67
]
87
68
 
88
69
script_info['version'] = __version__
93
74
    if opts.blast_db == None and opts.refseqs_fp == None:
94
75
        option_parser.error('Either blast_db or refseqs_fp must be provided.')
95
76
 
96
 
   # create local copies of command-line options
97
 
    python_exe_fp = opts.python_exe_fp
98
 
    pick_otus_fp = opts.pick_otus_fp
99
 
    refseqs_fp = opts.refseqs_fp
100
 
    cluster_jobs_fp = opts.cluster_jobs_fp
101
 
    input_fasta_fp = opts.input_fasta_fp 
102
 
    jobs_to_start = opts.jobs_to_start
103
 
    output_dir = opts.output_dir
104
 
    poller_fp = opts.poller_fp
105
 
    retain_temp_files = opts.retain_temp_files
106
 
    suppress_polling = opts.suppress_polling
107
 
    seconds_to_sleep = opts.seconds_to_sleep
108
 
    max_e_value = opts.max_e_value
109
 
    similarity = opts.similarity
110
 
    poll_directly = opts.poll_directly
111
 
    min_aligned_percent = opts.min_aligned_percent
112
 
 
113
 
    created_temp_paths = []
114
 
 
115
 
    if not opts.blast_db:        
116
 
        # Build the blast database from the reference_seqs_fp -- all procs
117
 
        # will then access one db rather than create one per proc
118
 
        blast_db, db_files_to_remove = \
119
 
             build_blast_db_from_fasta_path(refseqs_fp)
120
 
        created_temp_paths += db_files_to_remove
121
 
    else:
122
 
        blast_db = opts.blast_db
123
 
    
124
 
    # split the input filepath into directory and filename, base filename and
125
 
    # extension
126
 
    input_dir, input_fasta_fn = split(input_fasta_fp)
127
 
    input_file_basename, input_fasta_ext = splitext(input_fasta_fn)
128
 
    
129
 
    # set the job_prefix either based on what the user passed in,
130
 
    # or a random string beginning with RDP
131
 
    job_prefix = opts.job_prefix or get_random_job_prefix('POTU')
132
 
    
133
 
    # A temporary output directory is created in output_dir named
134
 
    # job_prefix. Output files are then moved from the temporary 
135
 
    # directory to the output directory when they are complete, allowing
136
 
    # a poller to detect when runs complete by the presence of their
137
 
    # output files.
138
 
    working_dir = '%s/%s' % (output_dir,job_prefix)
139
 
    try:
140
 
        makedirs(working_dir)
141
 
        created_temp_paths.append(working_dir)
142
 
    except OSError:
143
 
        # working dir already exists
144
 
        pass
145
 
    
146
 
    # compute the number of sequences that should be included in
147
 
    # each file after splitting the input fasta file   
148
 
    num_seqs_per_file = compute_seqs_per_file(input_fasta_fp,jobs_to_start)
149
 
     
150
 
    # split the fasta files and get the list of resulting files
151
 
    tmp_fasta_fps =\
152
 
      split_fasta(open(input_fasta_fp),num_seqs_per_file,\
153
 
      job_prefix,working_dir=output_dir)
154
 
    created_temp_paths += tmp_fasta_fps
155
 
    
156
 
    # build the filepath for the 'jobs script'
157
 
    jobs_fp = '%s/%sjobs.txt' % (output_dir, job_prefix)
158
 
    created_temp_paths.append(jobs_fp)
159
 
    
160
 
    # generate the list of commands to be pushed out to nodes and the list of
161
 
    # output files generated by each job
162
 
    commands, job_result_filepaths = \
163
 
     get_job_commands(python_exe_fp,pick_otus_fp,tmp_fasta_fps,
164
 
     output_dir,blast_db,job_prefix,working_dir,max_e_value,similarity,
165
 
     min_aligned_percent)
166
 
    created_temp_paths += job_result_filepaths
167
 
 
168
 
    # Set up poller apparatus if the user does not suppress polling
169
 
    if not suppress_polling:
170
 
        # Write the list of files which must exist for the jobs to be 
171
 
        # considered complete
172
 
        expected_files_filepath = '%s/expected_out_files.txt' % working_dir
173
 
        write_filepaths_to_file(job_result_filepaths,expected_files_filepath)
174
 
        created_temp_paths.append(expected_files_filepath)
175
 
        
176
 
        # Write the mapping file which described how the output files from
177
 
        # each job should be merged into the final output files
178
 
        merge_map_filepath = '%s/merge_map.txt' % working_dir
179
 
        process_run_results_f =\
180
 
         'qiime.parallel.pick_otus_blast.parallel_blast_process_run_results_f'
181
 
        write_merge_map_file_pick_otus(job_result_filepaths,output_dir,\
182
 
            merge_map_filepath,input_file_basename)
183
 
        created_temp_paths.append(merge_map_filepath)
184
 
        
185
 
        # Create the filepath listing the temporary files to be deleted,
186
 
        # but don't write it yet
187
 
        deletion_list_filepath = '%s/deletion_list.txt' % working_dir
188
 
        created_temp_paths.append(deletion_list_filepath)
189
 
        
190
 
        # Generate the command to run the poller, and the list of temp files
191
 
        # created by the poller
192
 
        if not poll_directly:
193
 
            poller_command, poller_result_filepaths =\
194
 
             get_poller_command(python_exe_fp,poller_fp,expected_files_filepath,\
195
 
             merge_map_filepath,deletion_list_filepath,process_run_results_f,\
196
 
             seconds_to_sleep=seconds_to_sleep)
197
 
            created_temp_paths += poller_result_filepaths
198
 
            # append the poller command to the list of job commands
199
 
            commands.append(poller_command)
200
 
        else:
201
 
            poller_command, poller_result_filepaths =\
202
 
             get_poller_command(python_exe_fp,poller_fp,expected_files_filepath,\
203
 
             merge_map_filepath,deletion_list_filepath,process_run_results_f,\
204
 
             seconds_to_sleep=seconds_to_sleep,command_prefix='',command_suffix='')
205
 
            created_temp_paths += poller_result_filepaths
206
 
        
207
 
        if not retain_temp_files:
208
 
            # If the user wants temp files deleted, now write the list of 
209
 
            # temp files to be deleted
210
 
            write_filepaths_to_file(created_temp_paths,deletion_list_filepath)
211
 
        else:
212
 
            # Otherwise just write an empty file
213
 
            write_filepaths_to_file([],deletion_list_filepath)
214
 
     
215
 
    # write the commands to the 'jobs files'
216
 
    write_jobs_file(commands,job_prefix=job_prefix,jobs_fp=jobs_fp)
217
 
    
218
 
    # submit the jobs file using cluster_jobs, if not suppressed by the
219
 
    # user
220
 
    if not opts.suppress_submit_jobs:
221
 
        submit_jobs(cluster_jobs_fp,jobs_fp,job_prefix)
222
 
        
223
 
    if poll_directly:
224
 
        try:
225
 
            check_call(poller_command.split())
226
 
        except CalledProcessError, e:
227
 
            print '**Error occuring when calling the poller directly. '+\
228
 
            'Jobs may have been submitted, but are not being polled.'
229
 
            print str(e)
230
 
            exit(-1)
231
 
            
 
77
    # create dict of command-line options
 
78
    params = eval(str(opts))
 
79
    
 
80
    parallel_runner = ParallelPickOtusBlast(
 
81
                                        cluster_jobs_fp=opts.cluster_jobs_fp,
 
82
                                        jobs_to_start=opts.jobs_to_start,
 
83
                                        retain_temp_files=opts.retain_temp_files,
 
84
                                        suppress_polling=opts.suppress_polling,
 
85
                                        seconds_to_sleep=opts.seconds_to_sleep)
 
86
    parallel_runner(opts.input_fasta_fp,
 
87
                    opts.output_dir,
 
88
                    params,
 
89
                    job_prefix=opts.job_prefix,
 
90
                    poll_directly=opts.poll_directly,
 
91
                    suppress_submit_jobs=False)
232
92
            
233
93
 
234
94