~ubuntu-branches/ubuntu/saucy/deal.ii/saucy

« back to all changes in this revision

Viewing changes to contrib/boost/include/boost/graph/distributed/detail/queue.ipp

  • Committer: Package Import Robot
  • Author(s): "Adam C. Powell, IV", Adam C. Powell, IV, Christophe Trophime
  • Date: 2012-02-21 06:57:30 UTC
  • mfrom: (3.1.7 sid)
  • Revision ID: package-import@ubuntu.com-20120221065730-r2iz70lg557wcd2e
Tags: 7.1.0-1
[ Adam C. Powell, IV ]
* New upstream (closes: #652057).
* Updated to use PETSc and SLEPc 3.2, and forward-ported all patches.
* Removed NetCDF Build-Depends because it uses serial HDF5.
* Made Sacado cmath patch work with new configure.
* Moved -dev package symlink lines in rules to arch all section.

[ Christophe Trophime ]
* debian/rules:
   - add dh_strip --dbg-package to generate dbg package (closes: #652058)
   - add .install files to simplify rules
* Add support for mumps, arpack (closes: #637655)
* Add patch for slepc 3.2 (closes: #659245)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
// Copyright (C) 2004-2006 The Trustees of Indiana University.
2
 
 
3
 
// Use, modification and distribution is subject to the Boost Software
4
 
// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
5
 
// http://www.boost.org/LICENSE_1_0.txt)
6
 
 
7
 
//  Authors: Douglas Gregor
8
 
//           Andrew Lumsdaine
9
 
#include <boost/optional.hpp>
10
 
#include <cassert>
11
 
#include <boost/graph/parallel/algorithm.hpp>
12
 
#include <boost/graph/parallel/process_group.hpp>
13
 
#include <functional>
14
 
#include <algorithm>
15
 
#include <boost/graph/parallel/simple_trigger.hpp>
16
 
 
17
 
#ifndef BOOST_GRAPH_USE_MPI
18
 
#error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
19
 
#endif
20
 
 
21
 
namespace boost { namespace graph { namespace distributed {
22
 
 
23
 
template<BOOST_DISTRIBUTED_QUEUE_PARMS>
24
 
BOOST_DISTRIBUTED_QUEUE_TYPE::
25
 
distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
26
 
                  const Buffer& buffer, bool polling)
27
 
  : process_group(process_group, attach_distributed_object()),
28
 
    owner(owner),
29
 
    buffer(buffer),
30
 
    polling(polling)
31
 
{
32
 
  if (!polling)
33
 
    outgoing_buffers.reset(
34
 
      new outgoing_buffers_t(num_processes(process_group)));
35
 
 
36
 
  setup_triggers();
37
 
}
38
 
 
39
 
template<BOOST_DISTRIBUTED_QUEUE_PARMS>
40
 
BOOST_DISTRIBUTED_QUEUE_TYPE::
41
 
distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
42
 
                  const Buffer& buffer, const UnaryPredicate& pred,
43
 
                  bool polling)
44
 
  : process_group(process_group, attach_distributed_object()),
45
 
    owner(owner),
46
 
    buffer(buffer),
47
 
    pred(pred),
48
 
    polling(polling)
49
 
{
50
 
  if (!polling)
51
 
    outgoing_buffers.reset(
52
 
      new outgoing_buffers_t(num_processes(process_group)));
53
 
 
54
 
  setup_triggers();
55
 
}
56
 
 
57
 
template<BOOST_DISTRIBUTED_QUEUE_PARMS>
58
 
BOOST_DISTRIBUTED_QUEUE_TYPE::
59
 
distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
60
 
                  const UnaryPredicate& pred, bool polling)
61
 
  : process_group(process_group, attach_distributed_object()),
62
 
    owner(owner),
63
 
    pred(pred),
64
 
    polling(polling)
65
 
{
66
 
  if (!polling)
67
 
    outgoing_buffers.reset(
68
 
      new outgoing_buffers_t(num_processes(process_group)));
69
 
 
70
 
  setup_triggers();
71
 
}
72
 
 
73
 
template<BOOST_DISTRIBUTED_QUEUE_PARMS>
74
 
void
75
 
BOOST_DISTRIBUTED_QUEUE_TYPE::push(const value_type& x)
76
 
{
77
 
  typename ProcessGroup::process_id_type dest = get(owner, x);
78
 
  if (outgoing_buffers)
79
 
    outgoing_buffers->at(dest).push_back(x);
80
 
  else if (dest == process_id(process_group))
81
 
    buffer.push(x);
82
 
  else
83
 
    send(process_group, get(owner, x), msg_push, x);
84
 
}
85
 
 
86
 
template<BOOST_DISTRIBUTED_QUEUE_PARMS>
87
 
bool
88
 
BOOST_DISTRIBUTED_QUEUE_TYPE::empty() const
89
 
{
90
 
  /* Processes will stay here until the buffer is nonempty or
91
 
     synchronization with the other processes indicates that all local
92
 
     buffers are empty (and no messages are in transit).
93
 
   */
94
 
  while (buffer.empty() && !do_synchronize()) ;
95
 
 
96
 
  return buffer.empty();
97
 
}
98
 
 
99
 
template<BOOST_DISTRIBUTED_QUEUE_PARMS>
100
 
typename BOOST_DISTRIBUTED_QUEUE_TYPE::size_type
101
 
BOOST_DISTRIBUTED_QUEUE_TYPE::size() const
102
 
{
103
 
  empty();
104
 
  return buffer.size();
105
 
}
106
 
 
107
 
template<BOOST_DISTRIBUTED_QUEUE_PARMS>
108
 
void BOOST_DISTRIBUTED_QUEUE_TYPE::setup_triggers()
109
 
{
110
 
  using boost::graph::parallel::simple_trigger;
111
 
 
112
 
  simple_trigger(process_group, msg_push, this, 
113
 
                 &distributed_queue::handle_push);
114
 
  simple_trigger(process_group, msg_multipush, this, 
115
 
                 &distributed_queue::handle_multipush);
116
 
}
117
 
 
118
 
template<BOOST_DISTRIBUTED_QUEUE_PARMS>
119
 
void 
120
 
BOOST_DISTRIBUTED_QUEUE_TYPE::
121
 
handle_push(int /*source*/, int /*tag*/, const value_type& value, 
122
 
            trigger_receive_context)
123
 
{
124
 
  if (pred(value)) buffer.push(value);
125
 
}
126
 
 
127
 
template<BOOST_DISTRIBUTED_QUEUE_PARMS>
128
 
void 
129
 
BOOST_DISTRIBUTED_QUEUE_TYPE::
130
 
handle_multipush(int /*source*/, int /*tag*/, 
131
 
                 const std::vector<value_type>& values, 
132
 
                 trigger_receive_context)
133
 
{
134
 
  for (std::size_t i = 0; i < values.size(); ++i)
135
 
    if (pred(values[i])) buffer.push(values[i]);
136
 
}
137
 
 
138
 
template<BOOST_DISTRIBUTED_QUEUE_PARMS>
139
 
bool
140
 
BOOST_DISTRIBUTED_QUEUE_TYPE::do_synchronize() const
141
 
{
142
 
#ifdef PBGL_ACCOUNTING
143
 
  ++num_synchronizations;
144
 
#endif
145
 
 
146
 
  using boost::parallel::all_reduce;
147
 
  using std::swap;
148
 
 
149
 
  typedef typename ProcessGroup::process_id_type process_id_type;
150
 
 
151
 
  if (outgoing_buffers) {
152
 
    // Transfer all of the push requests
153
 
    process_id_type id = process_id(process_group);
154
 
    process_id_type np = num_processes(process_group);
155
 
    for (process_id_type dest = 0; dest < np; ++dest) {
156
 
      outgoing_buffer_t& outgoing = outgoing_buffers->at(dest);
157
 
      std::size_t size = outgoing.size();
158
 
      if (size != 0) {
159
 
        if (dest != id) {
160
 
          send(process_group, dest, msg_multipush, outgoing);
161
 
        } else {
162
 
          for (std::size_t i = 0; i < size; ++i)
163
 
            buffer.push(outgoing[i]);
164
 
        }
165
 
        outgoing.clear();
166
 
      }
167
 
    }
168
 
  }
169
 
  synchronize(process_group);
170
 
 
171
 
  unsigned local_size = buffer.size();
172
 
  unsigned global_size =
173
 
    all_reduce(process_group, local_size, std::plus<unsigned>());
174
 
  return global_size == 0;
175
 
}
176
 
 
177
 
} } } // end namespace boost::graph::distributed