~ubuntu-branches/debian/sid/lammps/sid

« back to all changes in this revision

Viewing changes to lib/kokkos/core/src/Qthread/Kokkos_QthreadExec.cpp

  • Committer: Package Import Robot
  • Author(s): Anton Gladky
  • Date: 2015-04-29 23:44:49 UTC
  • mfrom: (5.1.3 experimental)
  • Revision ID: package-import@ubuntu.com-20150429234449-mbhy9utku6hp6oq8
Tags: 0~20150313.gitfa668e1-1
Upload into unstable.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
//@HEADER
 
3
// ************************************************************************
 
4
//
 
5
//   Kokkos: Manycore Performance-Portable Multidimensional Arrays
 
6
//              Copyright (2012) Sandia Corporation
 
7
//
 
8
// Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
 
9
// the U.S. Government retains certain rights in this software.
 
10
//
 
11
// Redistribution and use in source and binary forms, with or without
 
12
// modification, are permitted provided that the following conditions are
 
13
// met:
 
14
//
 
15
// 1. Redistributions of source code must retain the above copyright
 
16
// notice, this list of conditions and the following disclaimer.
 
17
//
 
18
// 2. Redistributions in binary form must reproduce the above copyright
 
19
// notice, this list of conditions and the following disclaimer in the
 
20
// documentation and/or other materials provided with the distribution.
 
21
//
 
22
// 3. Neither the name of the Corporation nor the names of the
 
23
// contributors may be used to endorse or promote products derived from
 
24
// this software without specific prior written permission.
 
25
//
 
26
// THIS SOFTWARE IS PROVIDED BY SANDIA CORPORATION "AS IS" AND ANY
 
27
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 
28
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
 
29
// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SANDIA CORPORATION OR THE
 
30
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
 
31
// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
 
32
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
 
33
// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
 
34
// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
 
35
// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
 
36
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
37
//
 
38
// Questions? Contact  H. Carter Edwards (hcedwar@sandia.gov)
 
39
//
 
40
// ************************************************************************
 
41
//@HEADER
 
42
*/
 
43
 
 
44
#include <Kokkos_Core_fwd.hpp>
 
45
 
 
46
#if defined( KOKKOS_HAVE_QTHREAD )
 
47
 
 
48
#include <stdio.h>
 
49
#include <stdlib.h>
 
50
#include <iostream>
 
51
#include <sstream>
 
52
#include <utility>
 
53
#include <Kokkos_Qthread.hpp>
 
54
#include <Kokkos_Atomic.hpp>
 
55
#include <impl/Kokkos_Error.hpp>
 
56
 
 
57
#define QTHREAD_LOCAL_PRIORITY
 
58
 
 
59
#include <qthread/qthread.h>
 
60
 
 
61
//----------------------------------------------------------------------------
 
62
 
 
63
namespace Kokkos {
 
64
namespace Impl {
 
65
namespace {
 
66
 
 
67
enum { MAXIMUM_QTHREAD_WORKERS = 1024 };
 
68
 
 
69
/** s_exec is indexed by the reverse rank of the workers
 
70
 *  for faster fan-in / fan-out lookups
 
71
 *  [ n - 1 , n - 2 , ... , 0 ]
 
72
 */
 
73
QthreadExec * s_exec[ MAXIMUM_QTHREAD_WORKERS ];
 
74
 
 
75
int  s_number_shepherds            = 0 ;
 
76
int  s_number_workers_per_shepherd = 0 ;
 
77
int  s_number_workers              = 0 ;
 
78
 
 
79
inline
 
80
QthreadExec ** worker_exec()
 
81
{
 
82
  return s_exec + s_number_workers - ( qthread_shep() * s_number_workers_per_shepherd + qthread_worker_local(NULL) + 1 );
 
83
}
 
84
 
 
85
const int s_base_size = QthreadExec::align_alloc( sizeof(QthreadExec) );
 
86
 
 
87
int s_worker_reduce_end   = 0 ; /* End of worker reduction memory    */
 
88
int s_worker_shared_end   = 0 ; /* Total of worker scratch memory    */
 
89
int s_worker_shared_begin = 0 ; /* Beginning of worker shared memory */
 
90
 
 
91
QthreadExecFunctionPointer s_active_function = 0 ;
 
92
const void               * s_active_function_arg = 0 ;
 
93
 
 
94
} /* namespace */
 
95
} /* namespace Impl */
 
96
} /* namespace Kokkos */
 
97
 
 
98
//----------------------------------------------------------------------------
 
99
 
 
100
namespace Kokkos {
 
101
 
 
102
void Qthread::initialize( int thread_count )
 
103
{
 
104
  // Environment variable: QTHREAD_NUM_SHEPHERDS
 
105
  // Environment variable: QTHREAD_NUM_WORKERS_PER_SHEP
 
106
  // Environment variable: QTHREAD_HWPAR
 
107
 
 
108
  {
 
109
    char buffer[256];
 
110
    snprintf(buffer,sizeof(buffer),"QTHREAD_HWPAR=%d",thread_count);
 
111
    putenv(buffer);
 
112
  }
 
113
 
 
114
  const bool ok_init = ( QTHREAD_SUCCESS == qthread_initialize() ) &&
 
115
                       ( thread_count    == qthread_num_shepherds() * qthread_num_workers_local(NO_SHEPHERD) ) &&
 
116
                       ( thread_count    == qthread_num_workers() );
 
117
 
 
118
  bool ok_symmetry = true ;
 
119
 
 
120
  if ( ok_init ) {
 
121
    Impl::s_number_shepherds            = qthread_num_shepherds();
 
122
    Impl::s_number_workers_per_shepherd = qthread_num_workers_local(NO_SHEPHERD);
 
123
    Impl::s_number_workers              = Impl::s_number_shepherds * Impl::s_number_workers_per_shepherd ;
 
124
 
 
125
    for ( int i = 0 ; ok_symmetry && i < Impl::s_number_shepherds ; ++i ) {
 
126
      ok_symmetry = ( Impl::s_number_workers_per_shepherd == qthread_num_workers_local(i) );
 
127
    }
 
128
  }
 
129
 
 
130
  if ( ! ok_init || ! ok_symmetry ) {
 
131
    std::ostringstream msg ;
 
132
 
 
133
    msg << "Kokkos::Qthread::initialize(" << thread_count << ") FAILED" ;
 
134
    msg << " : qthread_num_shepherds = " << qthread_num_shepherds();
 
135
    msg << " : qthread_num_workers_per_shepherd = " << qthread_num_workers_local(NO_SHEPHERD);
 
136
    msg << " : qthread_num_workers = " << qthread_num_workers();
 
137
 
 
138
    if ( ! ok_symmetry ) {
 
139
      msg << " : qthread_num_workers_local = {" ;
 
140
      for ( int i = 0 ; i < Impl::s_number_shepherds ; ++i ) {
 
141
        msg << " " << qthread_num_workers_local(i) ;
 
142
      }
 
143
      msg << " }" ;
 
144
    }
 
145
 
 
146
    Impl::s_number_workers   = 0 ;
 
147
    Impl::s_number_shepherds = 0 ;
 
148
    Impl::s_number_workers_per_shepherd = 0 ;
 
149
 
 
150
    if ( ok_init ) { qthread_finalize(); }
 
151
 
 
152
    Kokkos::Impl::throw_runtime_exception( msg.str() );
 
153
  }
 
154
 
 
155
  Impl::QthreadExec::resize_worker_scratch( 256 , 256 );
 
156
}
 
157
 
 
158
void Qthread::finalize()
 
159
{
 
160
  Impl::QthreadExec::clear_workers();
 
161
 
 
162
  if ( Impl::s_number_workers ) {
 
163
    qthread_finalize();
 
164
  }
 
165
 
 
166
  Impl::s_number_workers    = 0 ;
 
167
  Impl::s_number_shepherds  = 0 ;
 
168
  Impl::s_number_workers_per_shepherd = 0 ;
 
169
}
 
170
 
 
171
void Qthread::print_configuration( std::ostream & s , const bool detail )
 
172
{
 
173
  s << "Kokkos::Qthread {"
 
174
    << " num_shepherds(" << Impl::s_number_shepherds << ")"
 
175
    << " num_workers_per_shepherd(" << Impl::s_number_workers_per_shepherd << ")"
 
176
    << " }" << std::endl ;
 
177
}
 
178
 
 
179
Qthread & Qthread::instance( int )
 
180
{
 
181
  static Qthread q ;
 
182
  return q ;
 
183
}
 
184
 
 
185
void Qthread::fence()
 
186
{
 
187
}
 
188
 
 
189
int Qthread::shepherd_size() const { return Impl::s_number_shepherds ; }
 
190
int Qthread::shepherd_worker_size() const { return Impl::s_number_workers_per_shepherd ; }
 
191
 
 
192
} /* namespace Kokkos */
 
193
 
 
194
//----------------------------------------------------------------------------
 
195
 
 
196
namespace Kokkos {
 
197
namespace Impl {
 
198
namespace {
 
199
 
 
200
aligned_t driver_exec_all( void * arg )
 
201
{
 
202
  (*s_active_function)( ** worker_exec() , s_active_function_arg );
 
203
 
 
204
  return 0 ;
 
205
}
 
206
 
 
207
aligned_t driver_resize_worker_scratch( void * arg )
 
208
{
 
209
  static volatile int lock_begin = 0 ;
 
210
  static volatile int lock_end   = 0 ;
 
211
 
 
212
  QthreadExec ** const exec = worker_exec();
 
213
 
 
214
  //----------------------------------------
 
215
  // Serialize allocation for thread safety
 
216
 
 
217
  while ( ! atomic_compare_exchange_strong( & lock_begin , 0 , 1 ) ); // Spin wait to claim lock
 
218
 
 
219
  const bool ok = 0 == *exec ;
 
220
 
 
221
  if ( ok ) { *exec = (QthreadExec *) malloc( s_base_size + s_worker_shared_end ); }
 
222
 
 
223
  lock_begin = 0 ; // release lock
 
224
 
 
225
  if ( ok ) { new( *exec ) QthreadExec(); }
 
226
 
 
227
  //----------------------------------------
 
228
  // Wait for all calls to complete to insure that each worker has executed.
 
229
 
 
230
  if ( s_number_workers == 1 + atomic_fetch_add( & lock_end , 1 ) ) { lock_end = 0 ; }
 
231
 
 
232
  while ( lock_end );
 
233
 
 
234
  //----------------------------------------
 
235
 
 
236
  return 0 ;
 
237
}
 
238
 
 
239
void verify_is_process( const char * const label , bool not_active = false )
 
240
{
 
241
  const bool not_process = 0 != qthread_shep() || 0 != qthread_worker_local(NULL);
 
242
  const bool is_active   = not_active && ( s_active_function || s_active_function_arg );
 
243
 
 
244
  if ( not_process || is_active ) {
 
245
    std::string msg( label );
 
246
    msg.append( " : FAILED" );
 
247
    if ( not_process ) msg.append(" : not called by main process");
 
248
    if ( is_active )   msg.append(" : parallel execution in progress");
 
249
    Kokkos::Impl::throw_runtime_exception( msg );
 
250
  }
 
251
}
 
252
 
 
253
}
 
254
 
 
255
QthreadExec::QthreadExec()
 
256
{
 
257
  const int shepherd_rank        = qthread_shep();
 
258
  const int shepherd_worker_rank = qthread_worker_local(NULL);
 
259
  const int worker_rank          = shepherd_rank * s_number_workers_per_shepherd + shepherd_worker_rank ;
 
260
 
 
261
  m_worker_base          = s_exec ;
 
262
  m_shepherd_base        = s_exec + s_number_workers_per_shepherd * ( ( s_number_shepherds - ( shepherd_rank + 1 ) ) );
 
263
  m_scratch_alloc        = ( (unsigned char *) this ) + s_base_size ;
 
264
  m_reduce_end           = s_worker_reduce_end ;
 
265
  m_shepherd_rank        = shepherd_rank ;
 
266
  m_shepherd_size        = s_number_shepherds ;
 
267
  m_shepherd_worker_rank = shepherd_worker_rank ;
 
268
  m_shepherd_worker_size = s_number_workers_per_shepherd ;
 
269
  m_worker_rank          = worker_rank ;
 
270
  m_worker_size          = s_number_workers ;
 
271
  m_worker_state         = QthreadExec::Active ;
 
272
}
 
273
 
 
274
void QthreadExec::clear_workers()
 
275
{
 
276
  for ( int iwork = 0 ; iwork < s_number_workers ; ++iwork ) {
 
277
    free( s_exec[iwork] );
 
278
    s_exec[iwork] = 0 ;
 
279
  }
 
280
}
 
281
 
 
282
void QthreadExec::shared_reset( Qthread::scratch_memory_space & space )
 
283
{
 
284
  new( & space )
 
285
    Qthread::scratch_memory_space(
 
286
      ((unsigned char *) (**m_shepherd_base).m_scratch_alloc ) + s_worker_shared_begin ,
 
287
      s_worker_shared_end - s_worker_shared_begin
 
288
    );
 
289
}
 
290
 
 
291
void QthreadExec::resize_worker_scratch( const int reduce_size , const int shared_size )
 
292
{
 
293
  const int exec_all_reduce_alloc = align_alloc( reduce_size );
 
294
  const int shepherd_scan_alloc   = align_alloc( 8 );
 
295
  const int shepherd_shared_end   = exec_all_reduce_alloc + shepherd_scan_alloc + align_alloc( shared_size );
 
296
 
 
297
  if ( s_worker_reduce_end < exec_all_reduce_alloc ||
 
298
       s_worker_shared_end < shepherd_shared_end ) {
 
299
 
 
300
    // Clear current worker memory before allocating new worker memory
 
301
    clear_workers();
 
302
 
 
303
    // Increase the buffers to an aligned allocation
 
304
    s_worker_reduce_end   = exec_all_reduce_alloc ;
 
305
    s_worker_shared_begin = exec_all_reduce_alloc + shepherd_scan_alloc ;
 
306
    s_worker_shared_end   = shepherd_shared_end ;
 
307
 
 
308
    // Need to query which shepherd this main 'process' is running...
 
309
 
 
310
    // Have each worker resize its memory for proper first-touch
 
311
    for ( int jshep = 0 ; jshep < s_number_shepherds ; ++jshep ) {
 
312
    for ( int i = jshep ? 0 : 1 ; i < s_number_workers_per_shepherd ; ++i ) {
 
313
 
 
314
      // Unit tests hang with this call:
 
315
      //
 
316
      // qthread_fork_to_local_priority( driver_resize_workers , NULL , NULL , jshep );
 
317
      //
 
318
 
 
319
      qthread_fork_to( driver_resize_worker_scratch , NULL , NULL , jshep );
 
320
    }}
 
321
 
 
322
    driver_resize_worker_scratch( NULL );
 
323
 
 
324
    // Verify all workers allocated
 
325
 
 
326
    bool ok = true ;
 
327
    for ( int iwork = 0 ; ok && iwork < s_number_workers ; ++iwork ) { ok = 0 != s_exec[iwork] ; }
 
328
 
 
329
    if ( ! ok ) {
 
330
      std::ostringstream msg ;
 
331
      msg << "Kokkos::Impl::QthreadExec::resize : FAILED for workers {" ;
 
332
      for ( int iwork = 0 ; iwork < s_number_workers ; ++iwork ) {
 
333
         if ( 0 == s_exec[iwork] ) { msg << " " << ( s_number_workers - ( iwork + 1 ) ); }
 
334
      }
 
335
      msg << " }" ;
 
336
      Kokkos::Impl::throw_runtime_exception( msg.str() );
 
337
    }
 
338
  }
 
339
}
 
340
 
 
341
void QthreadExec::exec_all( Qthread & , QthreadExecFunctionPointer func , const void * arg )
 
342
{
 
343
  verify_is_process("QthreadExec::exec_all(...)",true);
 
344
 
 
345
  s_active_function     = func ;
 
346
  s_active_function_arg = arg ;
 
347
 
 
348
  // Need to query which shepherd this main 'process' is running...
 
349
 
 
350
  const int main_shep = qthread_shep();
 
351
 
 
352
  for ( int jshep = 0 , iwork = 0 ; jshep < s_number_shepherds ; ++jshep ) {
 
353
  for ( int i = jshep != main_shep ? 0 : 1 ; i < s_number_workers_per_shepherd ; ++i , ++iwork ) {
 
354
 
 
355
    // Unit tests hang with this call:
 
356
    //
 
357
    // qthread_fork_to_local_priority( driver_exec_all , NULL , NULL , jshep );
 
358
    //
 
359
 
 
360
    qthread_fork_to( driver_exec_all , NULL , NULL , jshep );
 
361
  }}
 
362
 
 
363
  driver_exec_all( NULL );
 
364
 
 
365
  s_active_function     = 0 ;
 
366
  s_active_function_arg = 0 ;
 
367
}
 
368
 
 
369
void * QthreadExec::exec_all_reduce_result()
 
370
{
 
371
  return s_exec[0]->m_scratch_alloc ;
 
372
}
 
373
 
 
374
} /* namespace Impl */
 
375
} /* namespace Kokkos */
 
376
 
 
377
//----------------------------------------------------------------------------
 
378
 
 
379
#endif /* #if defined( KOKKOS_HAVE_QTHREAD ) */
 
380