~ubuntu-branches/ubuntu/natty/python-cogent/natty

« back to all changes in this revision

Viewing changes to cogent/util/parallel.py

  • Committer: Bazaar Package Importer
  • Author(s): Steffen Moeller
  • Date: 2010-12-04 22:30:35 UTC
  • mfrom: (1.1.1 upstream)
  • Revision ID: james.westby@ubuntu.com-20101204223035-j11kinhcrrdgg2p2
Tags: 1.5-1
* Bumped standard to 3.9.1, no changes required.
* New upstream version.
  - major additions to Cookbook
  - added AlleleFreqs attribute to ensembl Variation objects.
  - added getGeneByStableId method to genome objects.
  - added Introns attribute to Transcript objects and an Intron class.
  - added Mann-Whitney test and a Monte-Carlo version
  - exploratory and confirmatory period estimation techniques (suitable for
    symbolic and continuous data)
  - Information theoretic measures (AIC and BIC) added
  - drawing of trees with collapsed nodes
  - progress display indicator support for terminal and GUI apps
  - added parser for illumina HiSeq2000 and GAiix sequence files as 
    cogent.parse.illumina_sequence.MinimalIlluminaSequenceParser.
  - added parser to FASTQ files, one of the output options for illumina's
    workflow, also added cookbook demo.
  - added functionality for parsing of SFF files without the Roche tools in
    cogent.parse.binary_sff
  - thousand fold performance improvement to nmds
  - >10-fold performance improvements to some Table operations

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
#!/usr/bin/env python
2
2
from __future__ import with_statement
3
 
import os, logging
 
3
import os, sys
4
4
from contextlib import contextmanager
5
 
 
6
 
 
7
 
__author__ = "Andrew Butterfield"
 
5
import warnings
 
6
 
 
7
__author__ = "Peter Maxwell"
8
8
__copyright__ = "Copyright 2007-2009, The Cogent Project"
9
9
__credits__ = ["Andrew Butterfield", "Peter Maxwell", "Gavin Huttley",
10
10
                "Matthew Wakefield", "Edward Lang"]
11
11
__license__ = "GPL"
12
 
__version__ = "1.4.1"
 
12
__version__ = "1.5.0"
13
13
__maintainer__ = "Gavin Huttley"
14
14
__email__ = "Gavin Huttley"
15
15
__status__ = "Production"
16
16
 
17
 
LOG = logging.getLogger('cogent')
18
 
 
19
17
# A flag to control if excess CPUs are worth a warning.
20
18
inefficiency_forgiven = False
21
19
 
43
41
    SUM = MAX = DOUBLE = 'fake'   
44
42
 
45
43
if os.environ.get('DONT_USE_MPI', 0):
46
 
    mpi = None
 
44
    print >>sys.stderr, 'Not using MPI'
 
45
    MPI = None
47
46
else:
48
47
    try:
49
48
        from mpi4py import MPI
50
49
    except ImportError:
 
50
        warnings.warn('Not using MPI as mpi4py not found', stacklevel=2)
51
51
        MPI = None
52
52
    else:
53
53
        size = MPI.COMM_WORLD.Get_size()
54
 
        LOG.info('MPI: %s processors' % size)
55
54
        if size == 1:
56
55
            MPI = None
57
56
 
58
57
if MPI is None:
59
 
    LOG.info('Not using MPI')
60
58
    def get_processor_name():
61
59
        return os.environ.get('HOSTNAME', 'one')
62
60
    _ParallelisationStack = [_FakeCommunicator()]
64
62
else:
65
63
    get_processor_name = MPI.Get_processor_name
66
64
    _ParallelisationStack = [MPI.COMM_WORLD]
67
 
 
 
65
        
68
66
def sync_random(r):
69
67
    if _ParallelisationStack[-1].Get_size() > 1:
70
68
        state = _ParallelisationStack[-1].bcast(r.getstate(), 0)
104
102
    with mpi_context(sub):
105
103
        yield next
106
104
 
107
 
def map(f,s,show_progress=False):
 
105
def map(f,s):
108
106
    result = []
109
107
    with mpi_split(len(s)) as comm:
110
108
        (size, rank) = (comm.Get_size(), comm.Get_rank())
116
114
                local_result = None
117
115
            split_results = comm.allgather(local_result)[:len(chunk)]
118
116
            result.extend(split_results)
119
 
            if show_progress and output_cpu:
120
 
                print ".", #start+len(chunk)
121
 
    if show_progress and output_cpu:
122
 
        print 
123
117
    return result
124
 
        
125
 
 
126
 
class ParaRandom:
127
 
    """Converts any random number generator with a .random() method
128
 
    into an MPI safe parallel random number generator.
129
 
    This relies on ParaRNG being passed the correct number of processes and rank.
130
 
    Internally ParaRNG assigns a phase for each process so that process n
131
 
    will always get the n th random number in the series.
132
 
    Without this method most random number generators will generate the same
133
 
    series on SMP machines and some MPI clusters.
134
 
    Can safely be used on itself to provide ParaRNG in nested parallelism.
135
 
    
136
 
    Warning: accessing the random number generator passed to this function after
137
 
    passing and without resetting the seed could generate duplicate values to
138
 
    those previously generated by this class or disrupt the phasing
139
 
    
140
 
    Arguments:
141
 
        o    random_number - a random number generator with a .random() method
142
 
             that generates a value between 0.0 and 1.0
143
 
        i    num_proc - the number of processes
144
 
        i    rank - the current processers rank
145
 
    
146
 
    """
147
 
    def __init__(self, random_number, num_proc = 1, rank = 0 ):
148
 
        self._rng = random_number
149
 
        self._num_proc = num_proc
150
 
        self._rank = rank
151
 
        #set the initial position in the random number series
152
 
        for i in range(self._rank):
153
 
            self._rng.random()
154
 
    
155
 
    def random(self):
156
 
        #get the current random number in the series
157
 
        r = self._rng.random()
158
 
        #advance by the number of processors to preposition for next call
159
 
        for i in range(self._num_proc):
160
 
            self._rng.random()
161
 
        return r
162
 
    
163
 
    def seed(self, arg):
164
 
        self._rng.seed(arg)
165
 
    
166
118
 
167
119
output_cpu = getCommunicator().Get_rank() == 0