1
1
#!/usr/bin/env python
2
2
from __future__ import with_statement
4
4
from contextlib import contextmanager
7
__author__ = "Andrew Butterfield"
7
__author__ = "Peter Maxwell"
8
8
__copyright__ = "Copyright 2007-2009, The Cogent Project"
9
9
__credits__ = ["Andrew Butterfield", "Peter Maxwell", "Gavin Huttley",
10
10
"Matthew Wakefield", "Edward Lang"]
11
11
__license__ = "GPL"
13
13
__maintainer__ = "Gavin Huttley"
14
14
__email__ = "Gavin Huttley"
15
15
__status__ = "Production"
17
LOG = logging.getLogger('cogent')
19
17
# A flag to control if excess CPUs are worth a warning.
20
18
inefficiency_forgiven = False
43
41
SUM = MAX = DOUBLE = 'fake'
45
43
if os.environ.get('DONT_USE_MPI', 0):
44
print >>sys.stderr, 'Not using MPI'
49
48
from mpi4py import MPI
50
49
except ImportError:
50
warnings.warn('Not using MPI as mpi4py not found', stacklevel=2)
53
53
size = MPI.COMM_WORLD.Get_size()
54
LOG.info('MPI: %s processors' % size)
59
LOG.info('Not using MPI')
60
58
def get_processor_name():
61
59
return os.environ.get('HOSTNAME', 'one')
62
60
_ParallelisationStack = [_FakeCommunicator()]
65
63
get_processor_name = MPI.Get_processor_name
66
64
_ParallelisationStack = [MPI.COMM_WORLD]
68
66
def sync_random(r):
69
67
if _ParallelisationStack[-1].Get_size() > 1:
70
68
state = _ParallelisationStack[-1].bcast(r.getstate(), 0)
104
102
with mpi_context(sub):
107
def map(f,s,show_progress=False):
109
107
with mpi_split(len(s)) as comm:
110
108
(size, rank) = (comm.Get_size(), comm.Get_rank())
116
114
local_result = None
117
115
split_results = comm.allgather(local_result)[:len(chunk)]
118
116
result.extend(split_results)
119
if show_progress and output_cpu:
120
print ".", #start+len(chunk)
121
if show_progress and output_cpu:
127
"""Converts any random number generator with a .random() method
128
into an MPI safe parallel random number generator.
129
This relies on ParaRNG being passed the correct number of processes and rank.
130
Internally ParaRNG assigns a phase for each process so that process n
131
will always get the n th random number in the series.
132
Without this method most random number generators will generate the same
133
series on SMP machines and some MPI clusters.
134
Can safely be used on itself to provide ParaRNG in nested parallelism.
136
Warning: accessing the random number generator passed to this function after
137
passing and without resetting the seed could generate duplicate values to
138
those previously generated by this class or disrupt the phasing
141
o random_number - a random number generator with a .random() method
142
that generates a value between 0.0 and 1.0
143
i num_proc - the number of processes
144
i rank - the current processers rank
147
def __init__(self, random_number, num_proc = 1, rank = 0 ):
148
self._rng = random_number
149
self._num_proc = num_proc
151
#set the initial position in the random number series
152
for i in range(self._rank):
156
#get the current random number in the series
157
r = self._rng.random()
158
#advance by the number of processors to preposition for next call
159
for i in range(self._num_proc):
167
119
output_cpu = getCommunicator().Get_rank() == 0