1
// Copyright (C) 2004-2006 The Trustees of Indiana University.
3
// Use, modification and distribution is subject to the Boost Software
4
// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
5
// http://www.boost.org/LICENSE_1_0.txt)
7
// Authors: Douglas Gregor
9
#include <boost/optional.hpp>
11
#include <boost/graph/parallel/algorithm.hpp>
12
#include <boost/graph/parallel/process_group.hpp>
15
#include <boost/graph/parallel/simple_trigger.hpp>
17
#ifndef BOOST_GRAPH_USE_MPI
18
#error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
21
namespace boost { namespace graph { namespace distributed {
23
template<BOOST_DISTRIBUTED_QUEUE_PARMS>
24
BOOST_DISTRIBUTED_QUEUE_TYPE::
25
distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
26
const Buffer& buffer, bool polling)
27
: process_group(process_group, attach_distributed_object()),
33
outgoing_buffers.reset(
34
new outgoing_buffers_t(num_processes(process_group)));
39
template<BOOST_DISTRIBUTED_QUEUE_PARMS>
40
BOOST_DISTRIBUTED_QUEUE_TYPE::
41
distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
42
const Buffer& buffer, const UnaryPredicate& pred,
44
: process_group(process_group, attach_distributed_object()),
51
outgoing_buffers.reset(
52
new outgoing_buffers_t(num_processes(process_group)));
57
template<BOOST_DISTRIBUTED_QUEUE_PARMS>
58
BOOST_DISTRIBUTED_QUEUE_TYPE::
59
distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
60
const UnaryPredicate& pred, bool polling)
61
: process_group(process_group, attach_distributed_object()),
67
outgoing_buffers.reset(
68
new outgoing_buffers_t(num_processes(process_group)));
73
template<BOOST_DISTRIBUTED_QUEUE_PARMS>
75
BOOST_DISTRIBUTED_QUEUE_TYPE::push(const value_type& x)
77
typename ProcessGroup::process_id_type dest = get(owner, x);
79
outgoing_buffers->at(dest).push_back(x);
80
else if (dest == process_id(process_group))
83
send(process_group, get(owner, x), msg_push, x);
86
template<BOOST_DISTRIBUTED_QUEUE_PARMS>
88
BOOST_DISTRIBUTED_QUEUE_TYPE::empty() const
90
/* Processes will stay here until the buffer is nonempty or
91
synchronization with the other processes indicates that all local
92
buffers are empty (and no messages are in transit).
94
while (buffer.empty() && !do_synchronize()) ;
96
return buffer.empty();
99
template<BOOST_DISTRIBUTED_QUEUE_PARMS>
100
typename BOOST_DISTRIBUTED_QUEUE_TYPE::size_type
101
BOOST_DISTRIBUTED_QUEUE_TYPE::size() const
104
return buffer.size();
107
template<BOOST_DISTRIBUTED_QUEUE_PARMS>
108
void BOOST_DISTRIBUTED_QUEUE_TYPE::setup_triggers()
110
using boost::graph::parallel::simple_trigger;
112
simple_trigger(process_group, msg_push, this,
113
&distributed_queue::handle_push);
114
simple_trigger(process_group, msg_multipush, this,
115
&distributed_queue::handle_multipush);
118
template<BOOST_DISTRIBUTED_QUEUE_PARMS>
120
BOOST_DISTRIBUTED_QUEUE_TYPE::
121
handle_push(int /*source*/, int /*tag*/, const value_type& value,
122
trigger_receive_context)
124
if (pred(value)) buffer.push(value);
127
template<BOOST_DISTRIBUTED_QUEUE_PARMS>
129
BOOST_DISTRIBUTED_QUEUE_TYPE::
130
handle_multipush(int /*source*/, int /*tag*/,
131
const std::vector<value_type>& values,
132
trigger_receive_context)
134
for (std::size_t i = 0; i < values.size(); ++i)
135
if (pred(values[i])) buffer.push(values[i]);
138
template<BOOST_DISTRIBUTED_QUEUE_PARMS>
140
BOOST_DISTRIBUTED_QUEUE_TYPE::do_synchronize() const
142
#ifdef PBGL_ACCOUNTING
143
++num_synchronizations;
146
using boost::parallel::all_reduce;
149
typedef typename ProcessGroup::process_id_type process_id_type;
151
if (outgoing_buffers) {
152
// Transfer all of the push requests
153
process_id_type id = process_id(process_group);
154
process_id_type np = num_processes(process_group);
155
for (process_id_type dest = 0; dest < np; ++dest) {
156
outgoing_buffer_t& outgoing = outgoing_buffers->at(dest);
157
std::size_t size = outgoing.size();
160
send(process_group, dest, msg_multipush, outgoing);
162
for (std::size_t i = 0; i < size; ++i)
163
buffer.push(outgoing[i]);
169
synchronize(process_group);
171
unsigned local_size = buffer.size();
172
unsigned global_size =
173
all_reduce(process_group, local_size, std::plus<unsigned>());
174
return global_size == 0;
177
} } } // end namespace boost::graph::distributed