~ubuntu-branches/ubuntu/trusty/qiime/trusty

« back to all changes in this revision

Viewing changes to qiime/parallel/util.py

  • Committer: Package Import Robot
  • Author(s): Andreas Tille
  • Date: 2013-06-17 18:28:26 UTC
  • mfrom: (9.1.2 sid)
  • Revision ID: package-import@ubuntu.com-20130617182826-376az5ad080a0sfe
Tags: 1.7.0+dfsg-1
Upload preparations done for BioLinux to Debian

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
#!/usr/bin/env python
2
 
#util.py
3
 
 
 
2
# File created on 07 Jul 2012
4
3
from __future__ import division
5
 
from random import choice
6
 
from os import popen, system, getenv, mkdir
7
 
from subprocess import Popen, PIPE, STDOUT
8
 
from os.path import split
9
 
from math import ceil
10
 
from cogent.parse.fasta import MinimalFastaParser
11
 
from qiime.util import qiime_system_call
12
4
 
13
5
__author__ = "Greg Caporaso"
14
 
__copyright__ = "Copyright 2011, The QIIME Project"
15
 
__credits__ = ["Greg Caporaso"] 
 
6
__copyright__ = "Copyright 2011, The QIIME project"
 
7
__credits__ = ["Greg Caporaso", "Jens Reeder", "Jai Ram Rideout"]
16
8
__license__ = "GPL"
17
 
__version__ = "1.5.0"
 
9
__version__ = "1.7.0"
18
10
__maintainer__ = "Greg Caporaso"
19
11
__email__ = "gregcaporaso@gmail.com"
20
12
__status__ = "Release"
21
13
 
 
14
from math import ceil
 
15
from os.path import split, splitext, join
 
16
from os import makedirs, mkdir
 
17
from random import choice
 
18
from cogent.parse.fasta import MinimalFastaParser
 
19
from qiime.split import split_fasta
 
20
from qiime.util import (load_qiime_config,
 
21
                        get_qiime_scripts_dir,
 
22
                        qiime_system_call,
 
23
                        count_seqs)
 
24
 
 
25
qiime_config = load_qiime_config()
 
26
 
22
27
RANDOM_JOB_PREFIX_CHARS = "abcdefghigklmnopqrstuvwxyz"
23
28
RANDOM_JOB_PREFIX_CHARS += RANDOM_JOB_PREFIX_CHARS.upper()
24
29
RANDOM_JOB_PREFIX_CHARS += "0123456790"
25
30
 
26
 
def split_fasta(infile, seqs_per_file, outfile_prefix, working_dir=''):
27
 
    """ Split infile into files with seqs_per_file sequences in each
28
 
    
29
 
        infile: list of fasta lines or open file object
30
 
        seqs_per_file: the number of sequences to include in each file
31
 
        out_fileprefix: string used to create output filepath - output filepaths
32
 
         are <out_prefix>.<i>.fasta where i runs from 0 to number of output files
33
 
        working_dir: directory to prepend to temp filepaths (defaults to 
34
 
         empty string -- files written to cwd)
35
 
         
36
 
        List of output filepaths is returned.
37
 
    
38
 
    """
39
 
    seq_counter = 0
40
 
    out_files = []
41
 
    if working_dir and not working_dir.endswith('/'):
42
 
        working_dir += '/'
43
 
        try:
44
 
            mkdir(working_dir)
 
31
class ParallelWrapper(object):
 
32
    """
 
33
    """
 
34
    
 
35
    def __init__(self,
 
36
                 python_exe_fp=qiime_config['python_exe_fp'],
 
37
                 cluster_jobs_fp=qiime_config['cluster_jobs_fp'],
 
38
                 jobs_to_start=int(qiime_config['jobs_to_start']),
 
39
                 poller_fp=join(get_qiime_scripts_dir(),'poller.py'),
 
40
                 retain_temp_files=False,
 
41
                 suppress_polling=False,
 
42
                 seconds_to_sleep=int(qiime_config['seconds_to_sleep'])):
 
43
        """  """
 
44
        
 
45
        self._python_exe_fp = python_exe_fp
 
46
        self._cluster_jobs_fp = cluster_jobs_fp
 
47
        self._jobs_to_start = jobs_to_start
 
48
        self._poller_fp = poller_fp
 
49
        self._retain_temp_files = retain_temp_files
 
50
        self._suppress_polling = suppress_polling
 
51
        self._seconds_to_sleep = seconds_to_sleep
 
52
 
 
53
    def _call_initialization(self,
 
54
                             input_fp,
 
55
                             output_dir,
 
56
                             params,
 
57
                             job_prefix,
 
58
                             poll_directly,
 
59
                             suppress_submit_jobs):
 
60
        """ Called as the first step in __call__.
 
61
        """
 
62
        pass
 
63
 
 
64
    def _call_cleanup(self,
 
65
                      input_fp,
 
66
                      output_dir,
 
67
                      params,
 
68
                      job_prefix,
 
69
                      poll_directly,
 
70
                      suppress_submit_jobs):
 
71
        """ Called as the last step in __call__.
 
72
        """
 
73
        pass
 
74
 
 
75
    def __call__(self,
 
76
                 input_fp,
 
77
                 output_dir,
 
78
                 params,
 
79
                 job_prefix=None,
 
80
                 poll_directly=False,
 
81
                 suppress_submit_jobs=False):
 
82
        """ """
 
83
        ## Generate a list of files and directories that will need to be cleaned up
 
84
        self.files_to_remove = []
 
85
        
 
86
        # Perform any method-specific setup. This should prevent the need to 
 
87
        # overwrite __call__
 
88
        self._call_initialization(input_fp,
 
89
                                  output_dir,
 
90
                                  params,
 
91
                                  job_prefix,
 
92
                                  poll_directly,
 
93
                                  suppress_submit_jobs)
 
94
    
 
95
        # split the input filepath into directory and filename, base filename and
 
96
        # extension for use in naming other files
 
97
        try:
 
98
            input_dir, input_fn = split(input_fp)
 
99
            input_file_basename, input_ext = splitext(input_fn)
 
100
        except AttributeError:
 
101
            ## THIS IS AWFUL - SOME OF THE SCRIPTS PASS A LIST, SO THE
 
102
            ## PREVIOUS BLOCK WON'T WORK... WHAT DO WE WANT TO DO?
 
103
            input_dir, input_fn = split(input_fp[0])
 
104
            input_file_basename, input_ext = splitext(input_fn)
 
105
        
 
106
        # Allow the user to override the default job_prefix (defined by the 
 
107
        # base classes)
 
108
        if job_prefix is None:
 
109
            job_prefix = self._get_random_job_prefix(self._job_prefix)
 
110
        # A temporary output directory is created in output_dir named
 
111
        # job_prefix. Output files are then moved from the temporary
 
112
        # directory to the output directory when they are complete,
 
113
        # allowing a poller to detect when runs complete by the presence
 
114
        # of their output files.
 
115
        working_dir = join(output_dir,job_prefix)
 
116
        try:
 
117
            makedirs(working_dir)
 
118
            self.files_to_remove.append(working_dir)
45
119
        except OSError:
 
120
            # working dir already exists
46
121
            pass
47
 
    
48
 
    for seq_id,seq in MinimalFastaParser(infile):
49
 
        if seq_counter == 0:
50
 
            current_out_fp = '%s%s.%d.fasta' \
51
 
              % (working_dir,outfile_prefix,len(out_files))
52
 
            current_out_file = open(current_out_fp, 'w')
53
 
            out_files.append(current_out_fp)
54
 
        current_out_file.write('>%s\n%s\n' % (seq_id, seq))
55
 
        seq_counter += 1
56
 
        
57
 
        if seq_counter == seqs_per_file:
58
 
            current_out_file.close()
59
 
            seq_counter = 0
60
 
            
61
 
    return out_files
62
 
 
63
 
def merge_to_n_commands(commands,n,delimiter=' ; ',
64
 
    command_prefix=None,command_suffix=None):
65
 
    """ merge a list of commands into n commands """
66
 
    if n < 1:
67
 
        raise ValueError, "n must be an integer >= 1"
68
 
        
69
 
    if command_prefix == None:
70
 
        command_prefix = '/bin/bash ;'
71
 
    else:
72
 
        command_prefix = command_prefix
73
 
        
74
 
    if command_suffix == None:
75
 
        command_suffix = '; exit'
76
 
    else:
77
 
        command_suffix = command_suffix
78
 
        
79
 
    result = []
80
 
    commands_per_merged_command = int(ceil((len(commands)/n)))
81
 
    # begin iterating through the commands
82
 
    cmd_counter = 0
83
 
    current_cmds = []
84
 
    for command in commands:
85
 
        current_cmds.append(command)
86
 
        cmd_counter += 1
87
 
        
88
 
        if cmd_counter == commands_per_merged_command:
89
 
            result.append(delimiter.join(current_cmds))
90
 
            current_cmds = []
91
 
            cmd_counter = 0
92
 
            
93
 
    if current_cmds:
94
 
        result[-1] = delimiter.join([result[-1]] + current_cmds)
95
 
    
96
 
    for i,r in enumerate(result):
97
 
        r = '%s %s %s' % (command_prefix, r, command_suffix)
98
 
        result[i] = r.strip()
99
 
    # result = ['%s %s %s' % (command_prefix, r, command_suffix)
100
 
    #           for r in result]
101
 
    
102
 
    return result
103
 
 
104
 
def get_random_job_prefix(fixed_prefix='',max_job_prefix_len=10,\
105
 
    leading_trailing_underscores=True):
106
 
    """ Return a string to use as job prefix
107
 
    
108
 
    """
109
 
 
110
 
    length = max_job_prefix_len - len(fixed_prefix)
111
 
    if leading_trailing_underscores:
112
 
        length -= 2 
113
 
    
114
 
    result = [choice(RANDOM_JOB_PREFIX_CHARS) for i in range(length)]
115
 
    if leading_trailing_underscores:
116
 
        return fixed_prefix + '_' + ''.join(result) + '_'
117
 
    else:
118
 
        return fixed_prefix + ''.join(result)
119
 
 
120
 
def write_jobs_file(commands,job_prefix=None,jobs_fp=None):
121
 
    """ Write commands to jobs_fp and return jobs_fp
122
 
    """
123
 
    
124
 
    if jobs_fp:
125
 
        jobs_fp = jobs_fp
126
 
    elif job_prefix:
127
 
        jobs_fp = job_prefix + 'jobs.txt'
128
 
    else:
129
 
        jobs_fp = 'jobs.txt'
130
 
        
131
 
    open(jobs_fp,'w').write('\n'.join(commands))
132
 
    
133
 
    return jobs_fp
134
 
    
135
 
def submit_jobs(path_to_cluster_jobs, jobs_fp, job_prefix):
136
 
    """ Submit the jobs to the queue using cluster_jobs.py
137
 
    """
138
 
    cmd = '%s -ms %s %s' % (path_to_cluster_jobs, jobs_fp, job_prefix)
139
 
    stdout, stderr, return_value = qiime_system_call(cmd)
140
 
    if return_value != 0:
141
 
        msg = "\n\n*** Could not start parallel jobs. \n" +\
142
 
         "Command run was:\n %s\n" % cmd +\
143
 
         "Command returned exit status: %d\n" % return_value +\
144
 
         "Stdout:\n%s\nStderr\n%s\n" % (stdout,stderr)
145
 
        raise RuntimeError, msg
146
 
    
147
 
    # Leave this comments in as they're useful for debugging.
148
 
    # print 'Return value: %d\n' % return_value
149
 
    # print 'STDOUT: %s\n' % stdout
150
 
    # print 'STDERR: %s\n' % stderr
151
 
 
152
 
def compute_seqs_per_file(input_fasta_fp,num_jobs_to_start):
153
 
    """ Compute the number of sequences to include in each split file
154
 
    """
155
 
    # count the number of sequences in the fasta file
156
 
    num_input_seqs = \
157
 
     int(popen("sed -n -e '/^>/p' %s | wc -l" % input_fasta_fp).read().strip())
 
122
        
 
123
        # Split the input file into the individual job input files. Add the
 
124
        # individual job files to the files_to_remove list
 
125
        input_fps, remove_input_on_completion = self._input_splitter(
 
126
                                         input_fp,
 
127
                                         params,
 
128
                                         self._jobs_to_start,
 
129
                                         job_prefix,
 
130
                                         working_dir)
 
131
        if remove_input_on_completion:
 
132
            self.files_to_remove += input_fps
 
133
        
 
134
        # Perform any method-specific setup (e.g., formatting a BLAST database)
 
135
        self._precommand_initiation(input_fp,output_dir,working_dir,params)
 
136
        
 
137
        # Generate the list of commands to be pushed out to workers 
 
138
        # and the list of output files generated by each job.
 
139
        commands, job_result_filepaths = self._get_job_commands(input_fps,
 
140
                                                                output_dir,
 
141
                                                                params,
 
142
                                                                job_prefix,
 
143
                                                                working_dir)
 
144
        self.files_to_remove += \
 
145
         self._identify_files_to_remove(job_result_filepaths,params)
 
146
 
 
147
        # Generate the output clean-up files
 
148
        merge_map_filepath, deletion_list_filepath, expected_files_filepath =\
 
149
         self._initialize_output_cleanup_files(job_result_filepaths,
 
150
                                               output_dir,
 
151
                                               working_dir,
 
152
                                               input_file_basename,
 
153
                                               params)
 
154
 
 
155
        # Set up poller apparatus if the user does not suppress polling
 
156
        if not self._suppress_polling:
 
157
            poller_command = self._initiate_polling(job_result_filepaths,
 
158
                                                    working_dir,
 
159
                                                    poll_directly,
 
160
                                                    merge_map_filepath,
 
161
                                                    deletion_list_filepath,
 
162
                                                    expected_files_filepath)
 
163
        
 
164
        # If the poller should be run in the same way as the other commands
 
165
        # (rather than by the current process), add it to the list of commands
 
166
        if not poll_directly:
 
167
            commands.append(poller_command)
158
168
     
159
 
    # divide the number of sequences by the number of jobs to start
160
 
    result = num_input_seqs/num_jobs_to_start
161
 
    
162
 
    # if we don't have a perfect split, round up
163
 
    if result % 1 != 0:
164
 
        result += 1
165
 
        
166
 
    # return the result as an integer
167
 
    return int(result)
168
 
    
169
 
def build_filepaths_from_filepaths(filepaths,prefix='',directory='',\
170
 
    suffix='',replacement=('','')):
171
 
    """ Returns a modified list of filepaths
172
 
    
173
 
        Modifications to each filepath, in order, are:
174
 
         1- strip path to convert filepath to filename 
175
 
             (e.g., mydir/out.txt => out.txt)
176
 
         2- replace all occurrences of replacement[0] with replacement[1]
177
 
         3- join directory, prefix, filename, suffix
178
 
         
179
 
        Order of results corresponds to order of input.
180
 
    """
181
 
 
182
 
    if directory and not directory.endswith('/'):
183
 
        directory += '/'
184
 
 
185
 
    results = []
186
 
    replace_from = replacement[0]
187
 
    replace_to = replacement[1]
188
 
 
189
 
    for fp in filepaths:
190
 
        file_dir, filename = split(fp)
191
 
        filename = filename.replace(replace_from,replace_to)
192
 
        result = '%s%s%s%s' % (directory,prefix,filename,suffix)
193
 
        results.append(result)
194
 
    
195
 
    return results
196
 
 
197
 
 
198
 
def get_poller_command(python_exe_fp,poller_fp,expected_files_filepath,\
199
 
    merge_map_filepath,deletion_list_filepath,seconds_to_sleep,\
200
 
    command_prefix='/bin/bash; ',command_suffix='; exit'):
201
 
    """Generate command to initiate a poller to monitior/process completed runs
202
 
    """
203
 
    
204
 
    result = '%s %s %s -f %s -m %s -d %s -t %d %s' % \
205
 
     (command_prefix,
206
 
      python_exe_fp,
207
 
      poller_fp,
208
 
      expected_files_filepath,
209
 
      merge_map_filepath,
210
 
      deletion_list_filepath,
211
 
      seconds_to_sleep,
212
 
      command_suffix)
 
169
        # Build the filepath for the 'jobs script'. Add that file to the 
 
170
        # files_to_remove list.
 
171
        jobs_fp = join(working_dir,job_prefix + 'jobs.txt')
 
172
        self._write_jobs_file(commands,jobs_fp)
 
173
        self.files_to_remove.append(jobs_fp)
 
174
    
 
175
        # submit the jobs file using cluster_jobs, if not suppressed by the
 
176
        # user
 
177
        if not suppress_submit_jobs:
 
178
            stdout, stderr, return_value = self._submit_jobs(
 
179
             jobs_fp=jobs_fp, job_prefix=job_prefix)
 
180
        
 
181
        # If the poller is going to be run by the current process, 
 
182
        # start polling
 
183
        if poll_directly:
 
184
            # IMPORTANT: the following line MUST use qiime_system_call()
 
185
            # instead of subprocess.call, .check_call, or .check_output in case
 
186
            # we are invoked in a child process with PIPEs (a deadlock will
 
187
            # occur otherwise). This can happen if this code is tested by
 
188
            # all_tests.py, for example.
 
189
            stdout, stderr, return_value = qiime_system_call(poller_command)
 
190
            if return_value != 0:
 
191
                print '**Error occuring when calling the poller directly. '+\
 
192
                'Jobs may have been submitted, but are not being polled.'
 
193
                print stderr
 
194
                print poller_command
 
195
                exit(-1)
 
196
        self.files_to_remove = []
 
197
 
 
198
        # Perform any method-specific cleanup. This should prevent the need to 
 
199
        # overwrite __call__
 
200
        self._call_cleanup(input_fp,
 
201
                           output_dir,
 
202
                           params,
 
203
                           job_prefix,
 
204
                           poll_directly,
 
205
                           suppress_submit_jobs)
 
206
 
 
207
    def _initialize_output_cleanup_files(self,
 
208
                                         job_result_filepaths,
 
209
                                         output_dir,
 
210
                                         working_dir,
 
211
                                         input_file_basename,
 
212
                                         params):
 
213
        # Write the mapping file which described how the output files from
 
214
        # each job should be merged into the final output files
 
215
        merge_map_filepath = '%s/merge_map.txt' % working_dir
 
216
        self._write_merge_map_file(input_file_basename,
 
217
                                   job_result_filepaths,
 
218
                                   params,
 
219
                                   output_dir,
 
220
                                   merge_map_filepath)
 
221
        self.files_to_remove.append(merge_map_filepath)
 
222
 
 
223
        # Create the filepath listing the temporary files to be deleted,
 
224
        # but don't write it yet
 
225
        deletion_list_filepath = '%s/deletion_list.txt' % working_dir
 
226
        self.files_to_remove.append(deletion_list_filepath)
 
227
        
 
228
        # Write the list of files which must exist for the jobs to be 
 
229
        # considered complete
 
230
        expected_files_filepath = '%s/expected_out_files.txt' % working_dir
 
231
        self._write_filepaths_to_file(job_result_filepaths,
 
232
                                      expected_files_filepath)
 
233
        self.files_to_remove.append(expected_files_filepath)
 
234
        
 
235
        return (merge_map_filepath,
 
236
                deletion_list_filepath,
 
237
                expected_files_filepath)
 
238
 
 
239
    def _submit_jobs(self,
 
240
                     jobs_fp,
 
241
                     job_prefix):
 
242
        """ Submit the jobs to the queue using cluster_jobs.py
 
243
        """
 
244
        cmd = '%s -ms %s %s' % (self._cluster_jobs_fp,
 
245
                                jobs_fp, 
 
246
                                job_prefix)
 
247
        stdout, stderr, return_value = qiime_system_call(cmd)
 
248
        if return_value != 0:
 
249
            msg = "\n\n*** Could not start parallel jobs. \n" +\
 
250
             "Command run was:\n %s\n" % cmd +\
 
251
             "Command returned exit status: %d\n" % return_value +\
 
252
             "Stdout:\n%s\nStderr\n%s\n" % (stdout,stderr)
 
253
            raise RuntimeError, msg
 
254
        
 
255
        # Leave this comments in as they're useful for debugging.
 
256
        # print 'Return value: %d\n' % return_value
 
257
        # print 'STDOUT: %s\n' % stdout
 
258
        # print 'STDERR: %s\n' % stderr
 
259
        
 
260
        return stdout, stderr, return_value
 
261
 
 
262
    def _identify_files_to_remove(self,job_result_filepaths,params):
 
263
        """ Select the files to remove: by default remove all files
 
264
        """
 
265
        return job_result_filepaths
 
266
    
 
267
    def _precommand_initiation(self,input_fp,output_dir,working_dir,params):
 
268
        pass
 
269
    
 
270
    def _write_merge_map_file(self,
 
271
                              input_file_basename,
 
272
                              job_result_filepaths,
 
273
                              params,
 
274
                              output_dir,
 
275
                              merge_map_filepath):
 
276
        """ Create an empty file by default. Most subclasses will overwrite this 
 
277
        """
 
278
        open(merge_map_filepath,'w').close()
 
279
 
 
280
    def _get_random_job_prefix(self, 
 
281
                               fixed_prefix='',
 
282
                               max_job_prefix_len=10,\
 
283
                               leading_trailing_underscores=True):
 
284
        """ Return a string to use as job prefix """
 
285
 
 
286
        length = max_job_prefix_len - len(fixed_prefix)
 
287
        if leading_trailing_underscores:
 
288
            length -= 2 
 
289
    
 
290
        result = [choice(RANDOM_JOB_PREFIX_CHARS) for i in range(length)]
 
291
        if leading_trailing_underscores:
 
292
            return fixed_prefix + '_' + ''.join(result) + '_'
 
293
        else:
 
294
            return fixed_prefix + ''.join(result)
 
295
 
 
296
    def _get_job_commands(self,
 
297
                          input_fps,
 
298
                          output_dir,
 
299
                          params,
 
300
                          job_prefix,
 
301
                          working_dir,
 
302
                          command_prefix='/bin/bash; ',
 
303
                          command_suffix='; exit'):
 
304
        raise NotImplementedError, "Subclass must override _get_jobs_commands"
 
305
    
 
306
    def _initiate_polling(self,
 
307
                          job_result_filepaths,
 
308
                          working_dir,
 
309
                          poll_directly,
 
310
                          merge_map_filepath,
 
311
                          deletion_list_filepath,
 
312
                          expected_files_filepath):
 
313
        # Generate the command to run the poller, and the list of temp files
 
314
        # created by the poller
 
315
        if not poll_directly:
 
316
            poller_command, poller_result_filepaths =\
 
317
             self._get_poller_command(expected_files_filepath,
 
318
                                      merge_map_filepath,
 
319
                                      deletion_list_filepath)
 
320
        else:
 
321
            # this 'else' exists only because of the command_prefix/suffix bullshit,
 
322
            # which needs to be refactored (related to trac #109)
 
323
            poller_command, poller_result_filepaths =\
 
324
             self._get_poller_command(expected_files_filepath,
 
325
                                      merge_map_filepath,
 
326
                                      deletion_list_filepath,
 
327
                                      command_prefix='',
 
328
                                      command_suffix='')
 
329
        
 
330
        self.files_to_remove += poller_result_filepaths
 
331
        
 
332
        if not self._retain_temp_files:
 
333
            # If the user wants temp files deleted, now write the list of 
 
334
            # temp files to be deleted
 
335
            self._write_filepaths_to_file(self.files_to_remove,
 
336
                                    deletion_list_filepath)
 
337
        else:
 
338
            # Otherwise just write an empty file
 
339
            self._write_filepaths_to_file([],
 
340
                                    deletion_list_filepath)
 
341
        
 
342
        return poller_command
 
343
 
 
344
    def _get_poller_command(self,
 
345
                            expected_files_filepath,
 
346
                            merge_map_filepath,
 
347
                            deletion_list_filepath,
 
348
                            command_prefix='/bin/bash; ',
 
349
                            command_suffix='; exit'):
 
350
        """Generate command to initiate a poller to monitior/process completed runs
 
351
        """
 
352
    
 
353
        result = '%s poller.py -f %s -m %s -d %s -t %d %s' % \
 
354
         (command_prefix,
 
355
          expected_files_filepath,
 
356
          merge_map_filepath,
 
357
          deletion_list_filepath,
 
358
          self._seconds_to_sleep,
 
359
          command_suffix)
213
360
      
214
 
    return result, []
215
 
        
216
 
def get_rename_command(out_filenames,tmp_output_dir,output_dir):
217
 
    """Generate commands to move out_filenames from tmp_output_dir to output_dir
 
361
        return result, []
 
362
 
 
363
    def _write_filepaths_to_file(self,
 
364
                                 job_result_filepaths,
 
365
                                 expected_files_filepath):
 
366
        f = open(expected_files_filepath,'w')
 
367
        f.write('\n'.join(job_result_filepaths))
 
368
        f.close()  
 
369
    
 
370
    def _get_rename_command(self,
 
371
                            out_filenames,
 
372
                            tmp_output_dir,
 
373
                            output_dir):
 
374
        """Generate commands to move out_filenames from tmp_output_dir to output_dir
 
375
        """
 
376
        result = ''
 
377
        result_filepaths = []
 
378
        for fn in out_filenames:
 
379
            tmp_result_filepath = '%s/%s' % (tmp_output_dir,fn)
 
380
            result_filepath = '%s/%s' % (output_dir,fn)
 
381
            result += \
 
382
             '; mv %s %s' % (tmp_result_filepath,result_filepath)
 
383
            result_filepaths.append(result_filepath)
 
384
        return result, result_filepaths
 
385
    
 
386
    def _write_jobs_file(self,
 
387
                         commands,
 
388
                         jobs_fp):
 
389
        """ Write commands to jobs_fp and return jobs_fp """
 
390
        open(jobs_fp,'w').write('\n'.join(commands))
 
391
        return jobs_fp
 
392
 
 
393
    def _merge_to_n_commands(self,
 
394
                             commands,
 
395
                             n,
 
396
                             delimiter=' ; ',
 
397
                             command_prefix=None,
 
398
                             command_suffix=None):
 
399
        """ merge a list of commands into n commands 
 
400
            
 
401
            This is used by parallel wrappers such as alpha_diversity and 
 
402
             beta_diversity which perform an operation on a collection of
 
403
             input files (opposed to the scripts that split an input file
 
404
             into the user-specified number of jobs).
 
405
        
 
406
        """
 
407
        if n < 1:
 
408
            raise ValueError, "number of commands (n) must be an integer >= 1"
 
409
        
 
410
        commands_to_filter = []
 
411
        if command_prefix == None:
 
412
            command_prefix = '/bin/bash ;'
 
413
            commands_to_filter.append('/bin/bash')
 
414
        else:
 
415
            commands_to_filter += [c.strip() for c in command_prefix.split(';') if c.strip()]
 
416
        
 
417
        if command_suffix == None:
 
418
            command_suffix = '; exit'
 
419
            commands_to_filter.append('exit')
 
420
        else:
 
421
            commands_to_filter += [c.strip() for c in command_suffix.split(';') if c.strip()]
 
422
        
 
423
        result = []
 
424
        commands_per_merged_command = int(ceil((len(commands)/n)))
 
425
        # begin iterating through the commands
 
426
        cmd_counter = 0
 
427
        current_cmds = []
 
428
        for command in commands:
 
429
            subcommands = [c.strip() for c in command.split(';')]
 
430
            current_cmds.append(delimiter.join([s for s in subcommands if s not in commands_to_filter]))
 
431
            cmd_counter += 1
 
432
        
 
433
            if cmd_counter == commands_per_merged_command:
 
434
                result.append(delimiter.join(current_cmds))
 
435
                current_cmds = []
 
436
                cmd_counter = 0
 
437
            
 
438
        if current_cmds:
 
439
            result[-1] = delimiter.join([result[-1]] + current_cmds)
 
440
    
 
441
        for i,r in enumerate(result):
 
442
            r = '%s %s %s' % (command_prefix, r, command_suffix)
 
443
            result[i] = r.strip()
 
444
    
 
445
        return result
 
446
    
 
447
    def _compute_seqs_per_file(self,
 
448
                               input_fasta_fp,
 
449
                               num_jobs_to_start):
 
450
        """ Compute the number of sequences to include in each split file
 
451
        """
 
452
        # count the number of sequences in the fasta file
 
453
        num_input_seqs = count_seqs(input_fasta_fp)[0]
 
454
     
 
455
        # divide the number of sequences by the number of jobs to start
 
456
        result = num_input_seqs/num_jobs_to_start
 
457
    
 
458
        # if we don't have a perfect split, round up
 
459
        if result % 1 != 0:
 
460
            result += 1
 
461
        
 
462
        # return the result as an integer
 
463
        return int(result)
 
464
    
 
465
    ####
 
466
    # General purpose _input_splitter functions
 
467
    ####
 
468
    def _split_fasta(self,
 
469
                     input_fp,
 
470
                     params,
 
471
                     jobs_to_start,
 
472
                     job_prefix,
 
473
                     output_dir):
 
474
        # compute the number of sequences that should be included in
 
475
        # each file after splitting the input fasta file   
 
476
        num_seqs_per_file = self._compute_seqs_per_file(input_fp,jobs_to_start)
 
477
     
 
478
        # split the fasta files and get the list of resulting files
 
479
        tmp_fasta_fps =\
 
480
          split_fasta(open(input_fp),num_seqs_per_file,\
 
481
          job_prefix,working_dir=output_dir)
 
482
        
 
483
        return tmp_fasta_fps, True
 
484
    
 
485
    def _input_existing_filepaths(self,
 
486
                                  input_fps,
 
487
                                  params,
 
488
                                  jobs_to_start,
 
489
                                  job_prefix,
 
490
                                  output_dir):
 
491
        return input_fps, False
 
492
 
 
493
class BufferedWriter():
 
494
    """A file like object that delays writing to file without keeping an open filehandle
 
495
 
 
496
    This class comes useful in scenarios were potentially many open fhs are needed
 
497
    (e.g. during splitting of inputs for parallelization). Since
 
498
    each OS limits the max number of open fh at any time, we provide a fh like class that
 
499
    can be used much like a regular (writable) fh, but without keeping the fh open permanently.
 
500
    Using a larger buffer size speeds up the program by using less of the expensive open/close 
 
501
    IO operations.    
218
502
    """
219
 
    result = ''
220
 
    result_filepaths = []
221
 
    for fn in out_filenames:
222
 
        tmp_result_filepath = '%s/%s' % (tmp_output_dir,fn)
223
 
        result_filepath = '%s/%s' % (output_dir,fn)
224
 
        result += \
225
 
         '; mv %s %s' % (tmp_result_filepath,result_filepath)
226
 
        result_filepaths.append(result_filepath)
227
 
    return result, result_filepaths
228
 
    
 
503
 
 
504
    def __init__(self, filename, buf_size=100):
 
505
        """
 
506
        filename: name of file to write to in append mode 
229
507
        
230
 
def write_filepaths_to_file(job_result_filepaths,expected_files_filepath):
231
 
    f = open(expected_files_filepath,'w')
232
 
    f.write('\n'.join(job_result_filepaths))
233
 
    f.close()  
234
 
    
235
 
    
236
 
def write_merge_map_file_align_seqs(job_result_filepaths,output_dir,\
237
 
    merge_map_filepath,input_file_basename):
238
 
    
239
 
    f = open(merge_map_filepath,'w')
240
 
    
241
 
    out_filepaths = ['%s/%s_aligned.fasta' % (output_dir,input_file_basename),
242
 
                     '%s/%s_failures.fasta' % (output_dir,input_file_basename),
243
 
                     '%s/%s_log.txt' % (output_dir,input_file_basename)]
244
 
    
245
 
    aligned_fps = []
246
 
    failures_fps = []
247
 
    log_fps = []
248
 
    
249
 
    for fp in job_result_filepaths:
250
 
        if fp.endswith('_aligned.fasta'):
251
 
            aligned_fps.append(fp)
252
 
        elif fp.endswith('_failures.fasta'):
253
 
            failures_fps.append(fp)
254
 
        else:
255
 
            log_fps.append(fp)
256
 
    
257
 
    for in_files, out_file in\
258
 
     zip([aligned_fps,failures_fps,log_fps],out_filepaths):
259
 
        f.write('\t'.join(in_files + [out_file]))
260
 
        f.write('\n')
261
 
    f.close()
262
 
  
263
 
def write_merge_map_file_pick_otus(job_result_filepaths,output_dir,\
264
 
    merge_map_filepath,input_file_basename,failures=False):
265
 
    
266
 
    f = open(merge_map_filepath,'w')
267
 
    
268
 
    otus_fps = []
269
 
    log_fps = []
270
 
    failures_fps = []
271
 
    
272
 
    if not failures:
273
 
        out_filepaths = [\
274
 
         '%s/%s_otus.txt' % (output_dir,input_file_basename),
275
 
         '%s/%s_otus.log' % (output_dir,input_file_basename)]
276
 
        in_filepaths = [otus_fps,log_fps]
277
 
    else:
278
 
        out_filepaths = [\
279
 
         '%s/%s_otus.txt' % (output_dir,input_file_basename),
280
 
         '%s/%s_otus.log' % (output_dir,input_file_basename),
281
 
         '%s/%s_failures.txt' % (output_dir,input_file_basename)]
282
 
        in_filepaths = [otus_fps,log_fps,failures_fps]
283
 
    
284
 
    for fp in job_result_filepaths:
285
 
        if fp.endswith('_otus.txt'):
286
 
            otus_fps.append(fp)
287
 
        elif fp.endswith('_otus.log'):
288
 
            log_fps.append(fp)
289
 
        elif fp.endswith('_failures.txt'):
290
 
            failures_fps.append(fp)
291
 
        else:
292
 
            pass
293
 
    
294
 
    for in_files, out_file in\
295
 
     zip(in_filepaths,out_filepaths):
296
 
        f.write('\t'.join(in_files + [out_file]))
297
 
        f.write('\n')
298
 
    f.close()
299
 
    
300
 
 
301
 
def write_merge_map_file_assign_taxonomy(job_result_filepaths,output_dir,\
302
 
    merge_map_filepath,input_file_basename):
303
 
    
304
 
    f = open(merge_map_filepath,'w')
305
 
    
306
 
    out_filepaths = [\
307
 
     '%s/%s_tax_assignments.txt' % (output_dir,input_file_basename),
308
 
     '%s/%s_tax_assignments.log' % (output_dir,input_file_basename)]
309
 
    
310
 
    assignment_fps = []
311
 
    log_fps = []
312
 
    
313
 
    for fp in job_result_filepaths:
314
 
        if fp.endswith('_tax_assignments.txt'):
315
 
            assignment_fps.append(fp)
316
 
        else:
317
 
            log_fps.append(fp)
318
 
    
319
 
    for in_files, out_file in\
320
 
     zip([assignment_fps,log_fps],out_filepaths):
321
 
        f.write('\t'.join(in_files + [out_file]))
322
 
        f.write('\n')
323
 
    f.close()
324
 
    
325
 
def write_merge_map_file_blast(job_result_filepaths,output_dir,\
326
 
    merge_map_filepath,input_file_basename):
327
 
    
328
 
    f = open(merge_map_filepath,'w')
329
 
    out_filepath =\
330
 
     '%s/%s_blast_out.txt' % (output_dir,input_file_basename)
331
 
    f.write('\t'.join(job_result_filepaths + [out_filepath]))
332
 
    f.write('\n')
333
 
    f.close()
334
 
 
335
 
def write_merge_map_file_identify_chimeric_seqs(job_result_filepaths,output_dir,\
336
 
    merge_map_filepath,input_file_basename, out_fp=None):
337
 
    
338
 
    f = open(merge_map_filepath,'w')
339
 
    
340
 
    if out_fp:
341
 
        out_filepaths = [out_fp]
342
 
    else:
343
 
        out_filepaths = [\
344
 
            '%s/%s_chimeric.txt' % (output_dir,input_file_basename)]
345
 
 
346
 
    chims_fps = []
347
 
    logs_fps = [] #logs_fp currently not used
348
 
 
349
 
    for fp in job_result_filepaths:
350
 
        if fp.endswith('_chimeric.txt'):
351
 
            chims_fps.append(fp)
352
 
        else:
353
 
            log_fps.append(fp)
354
 
    
355
 
    for in_files, out_file in\
356
 
     zip([chims_fps],out_filepaths):
357
 
        f.write('\t'.join(in_files + [out_file]))
358
 
        f.write('\n')
359
 
    f.close()
 
508
        buf_size: buffer size in chunks. Each write operations counts as one chunk.
 
509
        """
 
510
    
 
511
        if(buf_size<1):
 
512
            raise ValueError("Invalid buf_size. Must be 1 or larger.")
 
513
 
 
514
        self.buffer = []
 
515
        self.buf_size = buf_size
 
516
        self.filename = filename
 
517
 
 
518
        #touch the file
 
519
        fh = open(self.filename, "w")
 
520
        fh.close()
 
521
 
 
522
    def __del__(self):
 
523
        self._flush()
 
524
 
 
525
    def close(self):
 
526
        self._flush()
 
527
    
 
528
    def write(self, line):
 
529
        """write line to BufferedWriter"""
 
530
 
 
531
        self.buffer.append(line)
 
532
        if (len(self.buffer) > self.buf_size):
 
533
            self._flush()
 
534
 
 
535
    def _flush(self):
 
536
        """Write buffer to file"""
 
537
 
 
538
        fh = open(self.filename, "a")
 
539
        fh.write("".join(self.buffer))
 
540
        fh.close()
 
541
 
 
542
        self.buffer = []