2
/// This file is part of Rheolef.
4
/// Copyright (C) 2000-2009 Pierre Saramito <Pierre.Saramito@imag.fr>
6
/// Rheolef is free software; you can redistribute it and/or modify
7
/// it under the terms of the GNU General Public License as published by
8
/// the Free Software Foundation; either version 2 of the License, or
9
/// (at your option) any later version.
11
/// Rheolef is distributed in the hope that it will be useful,
12
/// but WITHOUT ANY WARRANTY; without even the implied warranty of
13
/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
/// GNU General Public License for more details.
16
/// You should have received a copy of the GNU General Public License
17
/// along with Rheolef; if not, write to the Free Software
18
/// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20
/// =========================================================================
21
#include "rheolef/config.h"
22
#ifdef _RHEOLEF_HAVE_MPI
24
#include "rheolef/hack_array.h"
28
// ===============================================================
30
// ===============================================================
31
template<class T, class A>
32
hack_array_mpi_rep<T,A>::hack_array_mpi_rep (const A& alloc)
41
template<class T, class A>
42
hack_array_mpi_rep<T,A>::hack_array_mpi_rep (const distributor& ownership, const parameter_type& param, const A& alloc)
43
: base(ownership,param,alloc),
51
template <class T, class A>
53
hack_array_mpi_rep<T,A>::resize (const distributor& ownership, const parameter_type& param)
55
base::resize (ownership, param);
59
_receive.waits.clear();
60
_receive.data.clear();
61
_receive_max_size = 0;
63
// ===============================================================
65
// ===============================================================
66
template <class T, class A>
68
hack_array_mpi_rep<T,A>::set_dis_entry (size_type dis_i, const generic_value_type& val)
70
if (ownership().is_owned (dis_i)) {
71
size_type i = dis_i - ownership().first_index();
75
assert_macro (dis_i < ownership().dis_size(), "index "<<dis_i
76
<< " is out of range [0:" << ownership().dis_size() << "[");
77
// loop on the raw data vector (fixedsize, run-time known)
78
size_type dis_iraw = base::_value_size*dis_i + 1;
79
typename generic_value_type::const_iterator iter = val._data_begin();
80
for (size_type loc_iraw = 0, loc_nraw = val._data_size(); loc_iraw < loc_nraw; loc_iraw++, iter++) {
81
_stash.insert (std::pair<const size_type,raw_type>(dis_iraw + loc_iraw, *iter));
84
template <class T, class A>
86
hack_array_mpi_rep<T,A>::dis_entry_assembly_begin ()
88
_receive_max_size = mpi_assembly_begin (
90
make_apply_iterator(_stash.begin(), first_op<typename stash_map_type::value_type>()),
91
make_apply_iterator(_stash.end(), first_op<typename stash_map_type::value_type>()),
92
raw_base::ownership(),
98
template <class T, class A>
100
hack_array_mpi_rep<T,A>::dis_entry_assembly_end()
107
raw_base::begin() - raw_base::ownership().first_index(),
108
set_op<raw_type,raw_type>(),
110
boost::mpl::false_()));
114
_receive.waits.clear();
115
_receive.data.clear();
116
_receive_max_size = 0;
118
// ===============================================================
120
// ===============================================================
121
/** @brief get values from ext_idx_set, that are managed by another proc
122
* new version: instead of sending automatic_data_type, send data_size*raw_value_type to boost::mpi
123
* => should work with mpi::boost and use simple MPI_Datatype instead of mallocated complex one
125
template <class T, class A>
126
template <class Set, class Map>
128
hack_array_mpi_rep<T,A>::append_dis_entry (const Set& ext_idx_set, Map& ext_idx_map) const
130
// 0) declare the local context for raw data
131
scatter_message<std::vector<raw_type> > raw_from;
132
scatter_message<std::vector<raw_type> > raw_to;
134
// 1) convert set to vector, for direct acess & multiply by data_size
135
std::vector<size_type> raw_ext_idx (base::_data_size*ext_idx_set.size());
136
typename Set::const_iterator iter = ext_idx_set.begin();
137
for (size_type i = 0; i < ext_idx_set.size(); i++, iter++) {
138
size_type idx = *iter;
139
for (size_type l = 0; l < base::_data_size; l++) {
140
size_type raw_i = base::_data_size*i + l;
141
size_type raw_idx = base::_value_size*idx + l + (base::_value_size - base::_data_size);
142
raw_ext_idx [raw_i] = raw_idx;
145
// 2) declare id[i]=i for scatter
146
std::vector<size_type> raw_id (raw_ext_idx.size());
147
for (size_type i = 0; i < raw_id.size(); i++) raw_id[i] = i;
150
size_type raw_dis_size = base::_value_size*ownership().dis_size();
151
size_type raw_size = base::_value_size*ownership().size();
152
distributor raw_ownership (raw_dis_size, ownership().comm(), raw_size);
153
distributor::tag_type tag_init = distributor::get_new_tag();
156
raw_ext_idx.begin().operator->(),
158
raw_id.begin().operator->(),
159
raw_ownership.dis_size(),
160
raw_ownership.begin().operator->(),
162
raw_ownership.comm(),
166
// 4) begin scatter: send local data to others and get ask for missing data
167
std::vector<raw_type> raw_buffer (raw_ext_idx.size());
168
distributor::tag_type tag = distributor::get_new_tag();
169
// access to an itrator to raw_data (behaves as a pointer on raw_type)
170
typedef typename base::base raw_base;
171
typename raw_base::const_iterator raw_begin = raw_base::begin();
174
raw_buffer.begin().operator->(),
177
set_op<raw_type,raw_type>(),
179
raw_ownership.comm());
181
// 5) end scatter: receive missing data
184
raw_buffer.begin().operator->(),
187
set_op<raw_type,raw_type>(),
189
raw_ownership.comm());
191
// 6) unpack raw data: build the associative container: pair (ext_idx ; data)
192
iter = ext_idx_set.begin();
193
for (size_type i = 0; i < ext_idx_set.size(); i++, iter++) {
194
size_type idx = *iter;
195
automatic_value_type value (base::_parameter);
196
typename automatic_value_type::iterator p = value._data_begin();
197
for (size_type l = 0; l < base::_data_size; l++, p++) {
198
size_type raw_i = base::_data_size*i + l;
199
*p = raw_buffer[raw_i];
201
ext_idx_map.insert (std::make_pair (idx, value));
204
template <class T, class A>
205
typename hack_array_mpi_rep<T,A>::const_reference
206
hack_array_mpi_rep<T,A>::dis_at (const size_type dis_i) const
208
if (ownership().is_owned(dis_i)) {
209
size_type i = dis_i - ownership().first_index();
210
return operator[](i);
212
typename scatter_map_type::const_iterator iter = _ext_x.find (dis_i);
213
check_macro (iter != _ext_x.end(), "unexpected external index="<<dis_i);
214
return (*iter).second;
216
// ===============================================================
218
// ===============================================================
219
template <class T, class A>
220
template <class PutFunction>
222
hack_array_mpi_rep<T,A>::put_values (odiststream& ops, PutFunction put_element) const
224
distributor::tag_type tag = distributor::get_new_tag();
225
std::ostream& os = ops.os();
227
// determine maximum message to arrive
229
mpi::reduce(comm(), size(), max_size, mpi::maximum<size_type>(), 0);
231
size_type io_proc = odiststream::io_proc();
232
if (ownership().process() == io_proc) {
233
base::put_values (ops, put_element);
234
// then, receive and print messages
235
std::vector<raw_type> raw_values (max_size*base::_data_size, std::numeric_limits<raw_type>::max());
236
for (size_type iproc = 0; iproc < ownership().n_process(); iproc++) {
237
if (iproc == io_proc) continue;
238
size_type loc_sz_i = ownership().size(iproc);
239
if (loc_sz_i == 0) continue;
240
mpi::status status = comm().recv(iproc, tag, raw_values.begin().operator->(), raw_values.size());
241
boost::optional<int> n_data_opt = status.count<raw_type>();
242
check_macro (n_data_opt, "receive failed");
243
size_type n_data = n_data_opt.get();
244
check_macro (n_data == loc_sz_i*base::_data_size, "unexpected receive message size");
245
typename T::automatic_type tmp (base::_parameter);
246
for (size_type i = 0; i < loc_sz_i; i++) {
247
typename T::iterator p = tmp._data_begin();
248
for (size_type iloc = 0; iloc < base::_data_size; iloc++, p++) {
249
*p = raw_values [i*base::_data_size + iloc];
251
put_element (os, tmp);
258
std::vector<raw_type> raw_values (size()*base::_data_size, std::numeric_limits<raw_type>::max());
259
for (size_type i = 0, n = size(); i < n; i++) {
260
for (size_type j = 0; j < base::_data_size; j++) {
261
raw_values [i*base::_data_size + j] = raw_base::operator[] (i*base::_value_size + j+(base::_value_size - base::_data_size));
264
comm().send(io_proc, tag, raw_values.begin().operator->(), raw_values.size());
269
template <class T, class A>
271
hack_array_mpi_rep<T,A>::put_values (odiststream& ops) const
273
return put_values (ops, _array_put_element_type<generic_value_type>());
275
template <class T, class A>
276
template <class GetFunction>
278
hack_array_mpi_rep<T,A>::get_values (idiststream& ips, GetFunction get_element)
280
distributor::tag_type tag = distributor::get_new_tag();
281
std::istream& is = ips.is();
282
size_type my_proc = ownership().process();
283
size_type io_proc = odiststream::io_proc();
284
size_type size_max = 1;
285
for (size_type iproc = 0; iproc < ownership().n_process(); iproc++) {
286
size_max = std::max (size_max, ownership().size(iproc));
288
distributor io_ownership (size_max, comm(), (my_proc == io_proc) ? size_max : 0);
289
hack_array_seq_rep<T,A> data_proc_j (io_ownership, base::_parameter);
290
if (my_proc == io_proc) {
291
// load first chunk associated to proc 0
292
check_macro (load_chunk (is, begin(), end(), get_element), "read failed on input stream.");
293
if (ownership().n_process() > 1) {
294
// read in other chuncks and send to other processors
295
// determine maximum chunck owned by other
296
std::vector<raw_type> raw_values (size_max*base::_data_size, std::numeric_limits<raw_type>::max());
297
typename hack_array_seq_rep<T,A>::iterator start_j = data_proc_j.begin();
298
// bizarre qu'on lise ts les blocs dans la meme zone de memoire
299
// et qu'on attende pas que ce soit envoye pour ecraser par le suivant ?
300
for (size_type jproc = 0; jproc < ownership().n_process(); jproc++) {
301
if (jproc == io_proc) continue;
302
// load first chunk associated to proc j
303
size_type loc_sz_j = ownership().size(jproc);
304
if (loc_sz_j == 0) continue;
305
typename hack_array_seq_rep<T,A>::iterator last_j = start_j + loc_sz_j;
306
check_macro (load_chunk (is, start_j, last_j, get_element), "read failed on input stream.");
307
for (size_type i = 0, n = loc_sz_j; i < n; i++) {
308
for (size_type j = 0; j < base::_data_size; j++) {
309
raw_values [i*base::_data_size + j] = data_proc_j.raw_base::operator[] (i*base::_value_size + j+(base::_value_size - base::_data_size));
312
comm().send (jproc, tag, raw_values.begin().operator->(), loc_sz_j*base::_data_size);
317
std::vector<raw_type> raw_values (size()*base::_data_size, std::numeric_limits<raw_type>::max());
318
comm().recv (io_proc, tag, raw_values.begin().operator->(), size()*base::_data_size);
319
for (size_type i = 0, n = size(); i < n; i++) {
320
for (size_type j = 0; j < base::_data_size; j++) {
321
raw_base::operator[] (i*base::_value_size + j+(base::_value_size - base::_data_size))
322
= raw_values [i*base::_data_size + j];
329
template <class T, class A>
331
hack_array_mpi_rep<T,A>::get_values (idiststream& ips)
333
return get_values (ips, _array_get_element_type<generic_value_type>());
335
template <class T, class A>
336
template <class PutFunction, class Permutation>
338
hack_array_mpi_rep<T,A>::permuted_put_values (
340
const Permutation& perm,
341
PutFunction put_element) const
343
// NOTE: could be merged with array::permuted_put_value : same code exactly
344
assert_macro (perm.size() == size(), "permutation size does not match");
345
size_type io_proc = odiststream::io_proc();
346
size_type my_proc = comm().rank();
347
distributor io_ownership (dis_size(), comm(), (my_proc == io_proc) ? dis_size() : 0);
348
hack_array_mpi_rep<T,A> perm_x (io_ownership, base::_parameter);
349
for (size_type i = 0, n = size(); i < n; i++) {
350
perm_x.dis_entry (perm[i]) = operator[](i);
352
perm_x.dis_entry_assembly_begin();
353
perm_x.dis_entry_assembly_end();
354
perm_x.hack_array_seq_rep<T,A>::put_values (ops, put_element);
357
// ===============================================================
359
// ===============================================================
360
template <class T, class A>
363
hack_array_mpi_rep<T,A>::repartition ( // old_numbering for *this
364
const array_mpi_rep<size_type,A2>& partition, // old_ownership
365
hack_array_mpi_rep<T,A>& new_array, // new_ownership
366
array_mpi_rep<size_type,A2>& old_numbering, // new_ownership
367
array_mpi_rep<size_type,A2>& new_numbering) const // old_ownership
370
communicator comm = ownership().comm();
371
size_type nproc = comm.size();
372
size_type my_proc = comm.rank();
373
vector<size_type> send_local_elt_size (nproc, 0);
374
typename array_mpi_rep<size_type,A2>::const_iterator iter_part = partition.begin();
375
for (size_type ie = 0; ie < partition.size(); ie++, iter_part++) {
376
send_local_elt_size [*iter_part]++;
378
vector<size_type> recv_local_elt_size (nproc, 0);
379
all_to_all (comm, send_local_elt_size, recv_local_elt_size);
380
vector<size_type> recv_local_elt_start (nproc+1);
381
recv_local_elt_start [0] = 0;
382
for (size_type iproc = 0; iproc < nproc; iproc++) {
383
recv_local_elt_start [iproc+1] = recv_local_elt_start [iproc] + recv_local_elt_size[iproc];
385
vector<size_type> send_local_elt_start (nproc);
386
all_to_all (comm, recv_local_elt_start.begin().operator->(), send_local_elt_start.begin().operator->());
387
size_type new_local_n_elt = recv_local_elt_start [nproc];
388
size_type global_n_elt = dis_size();
390
// re-distribute data:
391
distributor new_elt_ownership (global_n_elt, comm, new_local_n_elt);
392
new_array.resize (new_elt_ownership, base::_parameter); // CHANGE FROM ARRAY here
393
old_numbering.resize (new_elt_ownership, numeric_limits<size_type>::max());
394
new_numbering.resize (ownership(), numeric_limits<size_type>::max());
395
iter_part = partition.begin();
396
const_iterator iter_elt = begin();
397
typename array_mpi_rep<size_type,A2>::iterator iter_new_num_elt = new_numbering.begin();
398
for (size_type ie = 0, ne = partition.size(); ie < ne; ie++, iter_part++, iter_elt++, iter_new_num_elt++) {
399
size_type iproc = *iter_part;
400
const generic_value_type& x = *iter_elt; // CHANGE FROM ARRAY here
401
size_type new_global_ie = new_elt_ownership[iproc] + send_local_elt_start[iproc];
402
new_array.dis_entry (new_global_ie) = x;
403
*iter_new_num_elt = new_global_ie;
404
size_type old_global_ie = ownership()[my_proc] + ie;
405
old_numbering.dis_entry (new_global_ie) = old_global_ie;
406
send_local_elt_start[iproc]++;
408
new_array.dis_entry_assembly_begin ();
409
new_array.dis_entry_assembly_end();
410
old_numbering.dis_entry_assembly_begin();
411
old_numbering.dis_entry_assembly_end();
414
} // namespace rheolef
415
#endif // _RHEOLEF_HAVE_MPI