~ubuntu-branches/ubuntu/trusty/qiime/trusty

« back to all changes in this revision

Viewing changes to qiime/parallel/identify_chimeric_seqs.py

  • Committer: Package Import Robot
  • Author(s): Andreas Tille
  • Date: 2013-06-17 18:28:26 UTC
  • mfrom: (9.1.2 sid)
  • Revision ID: package-import@ubuntu.com-20130617182826-376az5ad080a0sfe
Tags: 1.7.0+dfsg-1
Upload preparations done for BioLinux to Debian

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
#!/usr/bin/env python
2
 
# Author: Jens Reeder (jens.reeder@gmail.com)
3
 
# identify_chimeric_seqs.py
4
 
 
5
2
from __future__ import division
6
 
from qiime.parallel.poller import basic_process_run_results_f
7
 
from qiime.parallel.util import get_rename_command
8
3
 
9
 
__author__ = "Greg Caporaso"
10
 
__copyright__ = "Copyright 2011, The QIIME Project"
11
 
__credits__ = ["Greg Caporaso","Jens Reeder"] 
 
4
__author__ = "Jai Ram Rideout"
 
5
__copyright__ = "Copyright 2012, The QIIME project"
 
6
__credits__ = ["Jai Ram Rideout"]
12
7
__license__ = "GPL"
13
 
__version__ = "1.5.0"
14
 
__maintainer__ = "Greg Caporaso"
15
 
__email__ = "gregcaporaso@gmail.com"
 
8
__version__ = "1.7.0"
 
9
__maintainer__ = "Jai Ram Rideout"
 
10
__email__ = "jai.rideout@gmail.com"
16
11
__status__ = "Release"
17
12
 
18
 
def get_job_commands(python_exe_fp, identify_chimeric_seqs_fp, fasta_fps,
19
 
                     output_dir, ref_seqs_fp, job_prefix, working_dir,
20
 
                     aligned_reference_seqs_fp, blast_db,
21
 
                     chimera_detection_method, min_div_ratio, num_fragments,
22
 
                     taxonomy_depth, max_e_value, id_to_taxonomy_fp,
23
 
                     command_prefix='', command_suffix=''):
24
 
#                     command_prefix='/bin/bash; ', command_suffix='; exit'):
25
 
    """Generate identify_chimeric_seqs commands which should be run
26
 
    """
27
 
    # Create basenames for each of the output files. These will be filled
28
 
    # in to create the full list of files created by all of the runs.
29
 
    out_filenames = [job_prefix + '.%d_chimeric.txt']
30
 
    
31
 
    # Create lists to store the results
32
 
    commands = []
33
 
    result_filepaths = []
34
 
    
35
 
    # Iterate over the input files
36
 
    for i,fasta_fp in enumerate(fasta_fps):
37
 
        # Each run ends with moving the output file from the tmp dir to
38
 
        # the output_dir. Build the command to perform the move here.
39
 
        rename_command, current_result_filepaths = get_rename_command(\
40
 
        [fn % i for fn in out_filenames], working_dir, output_dir)
41
 
        result_filepaths += current_result_filepaths
42
 
 
43
 
        #Need to be filled
44
 
        optional_options = ""
45
 
 
46
 
        if chimera_detection_method=='blast_fragments':
47
 
            
48
 
            if ref_seqs_fp:
49
 
                optional_options += " -r %s" % ref_seqs_fp
50
 
            if blast_db:
51
 
                optional_options += " -b %s" % blast_db
52
 
 
53
 
            command = \
54
 
                '%s %s %s -i %s -t %s -m blast_fragments -o %s -n %s -d %s -e %s %s %s %s' %\
55
 
                (command_prefix,
56
 
                 python_exe_fp,
57
 
                 identify_chimeric_seqs_fp,
58
 
                 fasta_fp,
59
 
                 id_to_taxonomy_fp,
60
 
                 working_dir+"/"+out_filenames[0] % i,
61
 
                 num_fragments,
62
 
                 taxonomy_depth,
63
 
                 max_e_value,
64
 
                 optional_options,  
65
 
                 rename_command,
66
 
                 command_suffix)
67
 
            
68
 
        elif chimera_detection_method=='ChimeraSlayer':
69
 
            if min_div_ratio:
70
 
                optional_options += " --min_div_ratio %s" % min_div_ratio
71
 
            if ref_seqs_fp:
72
 
                optional_options += " -r %s" % ref_seqs_fp
73
 
            command = \
74
 
                '%s %s %s -i %s -a %s -m ChimeraSlayer -o %s %s %s %s' %\
75
 
                (command_prefix,
76
 
                 python_exe_fp,
77
 
                 identify_chimeric_seqs_fp,
78
 
                 fasta_fp,
79
 
                 aligned_reference_seqs_fp,
80
 
                 working_dir+"/"+out_filenames[0] % i,
81
 
                 optional_options,    
82
 
                 rename_command,
83
 
                 command_suffix)
 
13
from os.path import split
 
14
from shutil import copy
 
15
from cogent.app.formatdb import build_blast_db_from_fasta_path
 
16
from cogent.parse.fasta import MinimalFastaParser
 
17
from qiime.identify_chimeric_seqs import make_cidx_file
 
18
from qiime.parse import parse_tmp_to_final_filepath_map_file
 
19
from qiime.util import write_degapped_fasta_to_file
 
20
 
 
21
from qiime.parallel.util import ParallelWrapper
 
22
 
 
23
class ParallelChimericSequenceIdentifier(ParallelWrapper):
 
24
    _script_name = 'identify_chimeric_seqs.py'
 
25
    _input_splitter = ParallelWrapper._split_fasta
 
26
    _job_prefix = 'CHIM'
 
27
    _process_run_results_f = \
 
28
         'qiime.parallel.identify_chimeric_seqs.basic_process_run_results_f'
 
29
 
 
30
    def _precommand_initiation(self, input_fp, output_dir, working_dir,
 
31
                               params):
 
32
        if params['chimera_detection_method'] == 'blast_fragments':
 
33
            blast_db, db_files_to_remove = \
 
34
                 build_blast_db_from_fasta_path(params['reference_seqs_fp'],
 
35
                                                output_dir=working_dir)
 
36
            self.files_to_remove += db_files_to_remove
 
37
            params['blast_db'] = blast_db
 
38
        elif params['chimera_detection_method'] == 'ChimeraSlayer':
 
39
            #copy the reference files to working dir
 
40
            #ChimeraSlayer creates an index file of the ref and
 
41
            #will crash without write permission in the ref seqs dir
 
42
            aligned_reference_seqs_fp = params['aligned_reference_seqs_fp']
 
43
            _, new_ref_filename = split(aligned_reference_seqs_fp)
 
44
            copy(aligned_reference_seqs_fp, working_dir)
 
45
            aligned_reference_seqs_fp = working_dir + "/" + new_ref_filename
 
46
 
 
47
            self.files_to_remove.append(aligned_reference_seqs_fp)
 
48
            params['aligned_reference_seqs_fp'] = aligned_reference_seqs_fp
 
49
     
 
50
            #if given, also copy the unaligned ref db
 
51
            reference_seqs_fp = params['reference_seqs_fp']
 
52
            if reference_seqs_fp:
 
53
                _, new_ref_filename = split(reference_seqs_fp)
 
54
                copy(reference_seqs_fp, working_dir)
 
55
                reference_seqs_fp = working_dir + "/" + new_ref_filename
 
56
            else:
 
57
                #otherwise create it
 
58
                reference_seqs_fp = write_degapped_fasta_to_file(
 
59
                        MinimalFastaParser(open(aligned_reference_seqs_fp)),
 
60
                                           tmp_dir=working_dir)
 
61
            #delete it afterwards
 
62
            self.files_to_remove.append(reference_seqs_fp)
 
63
            params['reference_seqs_fp'] = reference_seqs_fp
 
64
 
 
65
            #build blast db of reference, otherwise ChimeraSlayer will do it
 
66
            #and parallel jobs clash
 
67
            _, db_files_to_remove = \
 
68
                 build_blast_db_from_fasta_path(reference_seqs_fp)
 
69
            self.files_to_remove += db_files_to_remove
 
70
 
 
71
            #make the index file globally
 
72
            #Reason: ChimeraSlayer first checks to see if the index file is
 
73
            #there. If not it tries to create it. This can lead to race
 
74
            #condition if several parallel jobs try to create it at the same
 
75
            #time.
 
76
            make_cidx_file(aligned_reference_seqs_fp)
 
77
            self.files_to_remove.append(aligned_reference_seqs_fp + ".cidx")
84
78
        else:
85
 
           raise NotImplementedError
86
 
        commands.append(command)
87
 
 
88
 
    return commands, result_filepaths
89
 
 
90
 
def get_poller_command(python_exe_fp,poller_fp,expected_files_filepath,\
91
 
    merge_map_filepath,deletion_list_filepath,process_run_results_f,\
92
 
    seconds_to_sleep,command_prefix='/bin/bash; ',command_suffix='; exit'):
93
 
    """Generate command to initiate a poller to monitior/process completed runs
 
79
            raise ValueError("Unrecognized chimera detection method '%s'." %
 
80
                             params['chimera_detection_method'])
 
81
 
 
82
    def _get_job_commands(self, fasta_fps, output_dir, params, job_prefix,
 
83
                          working_dir, command_prefix='/bin/bash; ',
 
84
                          command_suffix='; exit'):
 
85
        """Generate identify_chimeric_seqs.py commands which should be run."""
 
86
        # Create basenames for each of the output files. These will be filled
 
87
        # in to create the full list of files created by all of the runs.
 
88
        out_filenames = [job_prefix + '.%d_chimeric.txt']
 
89
 
 
90
        # Create lists to store the results.
 
91
        commands = []
 
92
        result_filepaths = []
 
93
 
 
94
        # Iterate over the input files.
 
95
        for i, fasta_fp in enumerate(fasta_fps):
 
96
            # Each run ends with moving the output file from the tmp dir to
 
97
            # the output_dir. Build the command to perform the move here.
 
98
            rename_command, current_result_filepaths = \
 
99
                    self._get_rename_command([fn % i for fn in out_filenames],
 
100
                                             working_dir, output_dir)
 
101
            result_filepaths += current_result_filepaths
 
102
 
 
103
            optional_options = ""
 
104
            if params['chimera_detection_method'] == 'blast_fragments':
 
105
                command = \
 
106
                    '%s %s -i %s -t %s -m blast_fragments -o %s -n %s -d %s -e %s -b %s %s %s' % \
 
107
                    (command_prefix,
 
108
                     self._script_name,
 
109
                     fasta_fp,
 
110
                     params['id_to_taxonomy_fp'],
 
111
                     working_dir + "/" + out_filenames[0] % i,
 
112
                     params['num_fragments'],
 
113
                     params['taxonomy_depth'],
 
114
                     params['max_e_value'],
 
115
                     params['blast_db'],
 
116
                     rename_command,
 
117
                     command_suffix)
 
118
            elif params['chimera_detection_method'] == 'ChimeraSlayer':
 
119
                optional_options = ""
 
120
                if params['min_div_ratio']:
 
121
                    optional_options += " --min_div_ratio %s" % \
 
122
                                        params['min_div_ratio']
 
123
                if params['reference_seqs_fp']:
 
124
                    optional_options += " -r %s" % params['reference_seqs_fp']
 
125
 
 
126
                command = \
 
127
                    '%s %s -i %s -a %s -m ChimeraSlayer -o %s %s %s %s' % \
 
128
                    (command_prefix,
 
129
                     self._script_name,
 
130
                     fasta_fp,
 
131
                     params['aligned_reference_seqs_fp'],
 
132
                     working_dir + "/" + out_filenames[0] % i,
 
133
                     optional_options,    
 
134
                     rename_command,
 
135
                     command_suffix)
 
136
            else:
 
137
                raise NotImplementedError
 
138
            commands.append(command)
 
139
        return commands, result_filepaths
 
140
 
 
141
    def _get_poller_command(self,
 
142
                            expected_files_filepath,
 
143
                            merge_map_filepath,
 
144
                            deletion_list_filepath,
 
145
                            command_prefix='/bin/bash; ',
 
146
                            command_suffix='; exit'):
 
147
        """Generate command to initiate a poller to monitior/process completed runs
 
148
        """
 
149
        result = '%s poller.py -f %s -p %s -m %s -d %s -t %d %s' % \
 
150
         (command_prefix,
 
151
          expected_files_filepath,
 
152
          self._process_run_results_f,
 
153
          merge_map_filepath,
 
154
          deletion_list_filepath,
 
155
          self._seconds_to_sleep,
 
156
          command_suffix)
 
157
        return result, []
 
158
 
 
159
    def _write_merge_map_file(self, input_file_basename, job_result_filepaths,
 
160
                              params, output_dir, merge_map_filepath,
 
161
                              failures=False):
 
162
        f = open(merge_map_filepath,'w')
 
163
        out_filepaths = [params['output_fp']]
 
164
 
 
165
        chims_fps = []
 
166
        logs_fps = [] #logs_fp currently not used
 
167
 
 
168
        for fp in job_result_filepaths:
 
169
            if fp.endswith('_chimeric.txt'):
 
170
                chims_fps.append(fp)
 
171
            else:
 
172
                log_fps.append(fp)
 
173
 
 
174
        for in_files, out_file in zip([chims_fps], out_filepaths):
 
175
            f.write('\t'.join(in_files + [out_file]))
 
176
            f.write('\n')
 
177
        f.close()
 
178
 
 
179
 
 
180
def basic_process_run_results_f(f):
 
181
    """ Copy each list of infiles to each outfile and delete infiles
 
182
    
 
183
        f: file containing one set of mapping instructions per line
 
184
        
 
185
        example f:
 
186
         f1.txt f2.txt f3.txt f_combined.txt
 
187
         f1.log f2.log f3.log f_combined.log
 
188
         
 
189
        If f contained the two lines above, this function would 
 
190
         concatenate f1.txt, f2.txt, and f3.txt into f_combined.txt
 
191
         and f1.log, f2.log, and f3.log into f_combined.log
94
192
    """
95
 
    
96
 
    result = '%s %s %s -f %s -p %s -m %s -d %s -t %d %s' % \
97
 
     (command_prefix,
98
 
      python_exe_fp,
99
 
      poller_fp,
100
 
      expected_files_filepath,
101
 
      process_run_results_f,
102
 
      merge_map_filepath,
103
 
      deletion_list_filepath,
104
 
      seconds_to_sleep,
105
 
      command_suffix)
106
 
      
107
 
    return result, []
 
193
    infiles_lists,out_filepaths = parse_tmp_to_final_filepath_map_file(f)
 
194
    for infiles_list, out_filepath in zip(infiles_lists,out_filepaths):
 
195
        try:
 
196
            of = open(out_filepath,'w')
 
197
        except IOError:
 
198
            raise IOError,\
 
199
             "Poller can't open final output file: %s" % out_filepath  +\
 
200
             "\nLeaving individual jobs output.\n Do you have write access?"
 
201
 
 
202
        for fp in infiles_list:
 
203
            for line in open(fp):
 
204
               of.write('%s\n' % line.strip('\n'))
 
205
        of.close()
 
206
    # It is a good idea to have your clean_up_callback return True.
 
207
    # That way, if you get mixed up and pass it as check_run_complete_callback, 
 
208
    # you'll get an error right away rather than going into an infinite loop
 
209
    return True