3
// ************************************************************************
5
// Kokkos: Manycore Performance-Portable Multidimensional Arrays
6
// Copyright (2012) Sandia Corporation
8
// Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
9
// the U.S. Government retains certain rights in this software.
11
// Redistribution and use in source and binary forms, with or without
12
// modification, are permitted provided that the following conditions are
15
// 1. Redistributions of source code must retain the above copyright
16
// notice, this list of conditions and the following disclaimer.
18
// 2. Redistributions in binary form must reproduce the above copyright
19
// notice, this list of conditions and the following disclaimer in the
20
// documentation and/or other materials provided with the distribution.
22
// 3. Neither the name of the Corporation nor the names of the
23
// contributors may be used to endorse or promote products derived from
24
// this software without specific prior written permission.
26
// THIS SOFTWARE IS PROVIDED BY SANDIA CORPORATION "AS IS" AND ANY
27
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
28
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
29
// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SANDIA CORPORATION OR THE
30
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
31
// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
32
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
33
// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
34
// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
35
// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
36
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38
// Questions? Contact H. Carter Edwards (hcedwar@sandia.gov)
40
// ************************************************************************
44
#include <Kokkos_Core_fwd.hpp>
46
#if defined( KOKKOS_HAVE_QTHREAD )
53
#include <Kokkos_Qthread.hpp>
54
#include <Kokkos_Atomic.hpp>
55
#include <impl/Kokkos_Error.hpp>
57
#define QTHREAD_LOCAL_PRIORITY
59
#include <qthread/qthread.h>
61
//----------------------------------------------------------------------------
67
enum { MAXIMUM_QTHREAD_WORKERS = 1024 };
69
/** s_exec is indexed by the reverse rank of the workers
70
* for faster fan-in / fan-out lookups
71
* [ n - 1 , n - 2 , ... , 0 ]
73
QthreadExec * s_exec[ MAXIMUM_QTHREAD_WORKERS ];
75
int s_number_shepherds = 0 ;
76
int s_number_workers_per_shepherd = 0 ;
77
int s_number_workers = 0 ;
80
QthreadExec ** worker_exec()
82
return s_exec + s_number_workers - ( qthread_shep() * s_number_workers_per_shepherd + qthread_worker_local(NULL) + 1 );
85
const int s_base_size = QthreadExec::align_alloc( sizeof(QthreadExec) );
87
int s_worker_reduce_end = 0 ; /* End of worker reduction memory */
88
int s_worker_shared_end = 0 ; /* Total of worker scratch memory */
89
int s_worker_shared_begin = 0 ; /* Beginning of worker shared memory */
91
QthreadExecFunctionPointer s_active_function = 0 ;
92
const void * s_active_function_arg = 0 ;
95
} /* namespace Impl */
96
} /* namespace Kokkos */
98
//----------------------------------------------------------------------------
102
void Qthread::initialize( int thread_count )
104
// Environment variable: QTHREAD_NUM_SHEPHERDS
105
// Environment variable: QTHREAD_NUM_WORKERS_PER_SHEP
106
// Environment variable: QTHREAD_HWPAR
110
snprintf(buffer,sizeof(buffer),"QTHREAD_HWPAR=%d",thread_count);
114
const bool ok_init = ( QTHREAD_SUCCESS == qthread_initialize() ) &&
115
( thread_count == qthread_num_shepherds() * qthread_num_workers_local(NO_SHEPHERD) ) &&
116
( thread_count == qthread_num_workers() );
118
bool ok_symmetry = true ;
121
Impl::s_number_shepherds = qthread_num_shepherds();
122
Impl::s_number_workers_per_shepherd = qthread_num_workers_local(NO_SHEPHERD);
123
Impl::s_number_workers = Impl::s_number_shepherds * Impl::s_number_workers_per_shepherd ;
125
for ( int i = 0 ; ok_symmetry && i < Impl::s_number_shepherds ; ++i ) {
126
ok_symmetry = ( Impl::s_number_workers_per_shepherd == qthread_num_workers_local(i) );
130
if ( ! ok_init || ! ok_symmetry ) {
131
std::ostringstream msg ;
133
msg << "Kokkos::Qthread::initialize(" << thread_count << ") FAILED" ;
134
msg << " : qthread_num_shepherds = " << qthread_num_shepherds();
135
msg << " : qthread_num_workers_per_shepherd = " << qthread_num_workers_local(NO_SHEPHERD);
136
msg << " : qthread_num_workers = " << qthread_num_workers();
138
if ( ! ok_symmetry ) {
139
msg << " : qthread_num_workers_local = {" ;
140
for ( int i = 0 ; i < Impl::s_number_shepherds ; ++i ) {
141
msg << " " << qthread_num_workers_local(i) ;
146
Impl::s_number_workers = 0 ;
147
Impl::s_number_shepherds = 0 ;
148
Impl::s_number_workers_per_shepherd = 0 ;
150
if ( ok_init ) { qthread_finalize(); }
152
Kokkos::Impl::throw_runtime_exception( msg.str() );
155
Impl::QthreadExec::resize_worker_scratch( 256 , 256 );
158
void Qthread::finalize()
160
Impl::QthreadExec::clear_workers();
162
if ( Impl::s_number_workers ) {
166
Impl::s_number_workers = 0 ;
167
Impl::s_number_shepherds = 0 ;
168
Impl::s_number_workers_per_shepherd = 0 ;
171
void Qthread::print_configuration( std::ostream & s , const bool detail )
173
s << "Kokkos::Qthread {"
174
<< " num_shepherds(" << Impl::s_number_shepherds << ")"
175
<< " num_workers_per_shepherd(" << Impl::s_number_workers_per_shepherd << ")"
176
<< " }" << std::endl ;
179
Qthread & Qthread::instance( int )
185
void Qthread::fence()
189
int Qthread::shepherd_size() const { return Impl::s_number_shepherds ; }
190
int Qthread::shepherd_worker_size() const { return Impl::s_number_workers_per_shepherd ; }
192
} /* namespace Kokkos */
194
//----------------------------------------------------------------------------
200
aligned_t driver_exec_all( void * arg )
202
(*s_active_function)( ** worker_exec() , s_active_function_arg );
207
aligned_t driver_resize_worker_scratch( void * arg )
209
static volatile int lock_begin = 0 ;
210
static volatile int lock_end = 0 ;
212
QthreadExec ** const exec = worker_exec();
214
//----------------------------------------
215
// Serialize allocation for thread safety
217
while ( ! atomic_compare_exchange_strong( & lock_begin , 0 , 1 ) ); // Spin wait to claim lock
219
const bool ok = 0 == *exec ;
221
if ( ok ) { *exec = (QthreadExec *) malloc( s_base_size + s_worker_shared_end ); }
223
lock_begin = 0 ; // release lock
225
if ( ok ) { new( *exec ) QthreadExec(); }
227
//----------------------------------------
228
// Wait for all calls to complete to insure that each worker has executed.
230
if ( s_number_workers == 1 + atomic_fetch_add( & lock_end , 1 ) ) { lock_end = 0 ; }
234
//----------------------------------------
239
void verify_is_process( const char * const label , bool not_active = false )
241
const bool not_process = 0 != qthread_shep() || 0 != qthread_worker_local(NULL);
242
const bool is_active = not_active && ( s_active_function || s_active_function_arg );
244
if ( not_process || is_active ) {
245
std::string msg( label );
246
msg.append( " : FAILED" );
247
if ( not_process ) msg.append(" : not called by main process");
248
if ( is_active ) msg.append(" : parallel execution in progress");
249
Kokkos::Impl::throw_runtime_exception( msg );
255
QthreadExec::QthreadExec()
257
const int shepherd_rank = qthread_shep();
258
const int shepherd_worker_rank = qthread_worker_local(NULL);
259
const int worker_rank = shepherd_rank * s_number_workers_per_shepherd + shepherd_worker_rank ;
261
m_worker_base = s_exec ;
262
m_shepherd_base = s_exec + s_number_workers_per_shepherd * ( ( s_number_shepherds - ( shepherd_rank + 1 ) ) );
263
m_scratch_alloc = ( (unsigned char *) this ) + s_base_size ;
264
m_reduce_end = s_worker_reduce_end ;
265
m_shepherd_rank = shepherd_rank ;
266
m_shepherd_size = s_number_shepherds ;
267
m_shepherd_worker_rank = shepherd_worker_rank ;
268
m_shepherd_worker_size = s_number_workers_per_shepherd ;
269
m_worker_rank = worker_rank ;
270
m_worker_size = s_number_workers ;
271
m_worker_state = QthreadExec::Active ;
274
void QthreadExec::clear_workers()
276
for ( int iwork = 0 ; iwork < s_number_workers ; ++iwork ) {
277
free( s_exec[iwork] );
282
void QthreadExec::shared_reset( Qthread::scratch_memory_space & space )
285
Qthread::scratch_memory_space(
286
((unsigned char *) (**m_shepherd_base).m_scratch_alloc ) + s_worker_shared_begin ,
287
s_worker_shared_end - s_worker_shared_begin
291
void QthreadExec::resize_worker_scratch( const int reduce_size , const int shared_size )
293
const int exec_all_reduce_alloc = align_alloc( reduce_size );
294
const int shepherd_scan_alloc = align_alloc( 8 );
295
const int shepherd_shared_end = exec_all_reduce_alloc + shepherd_scan_alloc + align_alloc( shared_size );
297
if ( s_worker_reduce_end < exec_all_reduce_alloc ||
298
s_worker_shared_end < shepherd_shared_end ) {
300
// Clear current worker memory before allocating new worker memory
303
// Increase the buffers to an aligned allocation
304
s_worker_reduce_end = exec_all_reduce_alloc ;
305
s_worker_shared_begin = exec_all_reduce_alloc + shepherd_scan_alloc ;
306
s_worker_shared_end = shepherd_shared_end ;
308
// Need to query which shepherd this main 'process' is running...
310
// Have each worker resize its memory for proper first-touch
311
for ( int jshep = 0 ; jshep < s_number_shepherds ; ++jshep ) {
312
for ( int i = jshep ? 0 : 1 ; i < s_number_workers_per_shepherd ; ++i ) {
314
// Unit tests hang with this call:
316
// qthread_fork_to_local_priority( driver_resize_workers , NULL , NULL , jshep );
319
qthread_fork_to( driver_resize_worker_scratch , NULL , NULL , jshep );
322
driver_resize_worker_scratch( NULL );
324
// Verify all workers allocated
327
for ( int iwork = 0 ; ok && iwork < s_number_workers ; ++iwork ) { ok = 0 != s_exec[iwork] ; }
330
std::ostringstream msg ;
331
msg << "Kokkos::Impl::QthreadExec::resize : FAILED for workers {" ;
332
for ( int iwork = 0 ; iwork < s_number_workers ; ++iwork ) {
333
if ( 0 == s_exec[iwork] ) { msg << " " << ( s_number_workers - ( iwork + 1 ) ); }
336
Kokkos::Impl::throw_runtime_exception( msg.str() );
341
void QthreadExec::exec_all( Qthread & , QthreadExecFunctionPointer func , const void * arg )
343
verify_is_process("QthreadExec::exec_all(...)",true);
345
s_active_function = func ;
346
s_active_function_arg = arg ;
348
// Need to query which shepherd this main 'process' is running...
350
const int main_shep = qthread_shep();
352
for ( int jshep = 0 , iwork = 0 ; jshep < s_number_shepherds ; ++jshep ) {
353
for ( int i = jshep != main_shep ? 0 : 1 ; i < s_number_workers_per_shepherd ; ++i , ++iwork ) {
355
// Unit tests hang with this call:
357
// qthread_fork_to_local_priority( driver_exec_all , NULL , NULL , jshep );
360
qthread_fork_to( driver_exec_all , NULL , NULL , jshep );
363
driver_exec_all( NULL );
365
s_active_function = 0 ;
366
s_active_function_arg = 0 ;
369
void * QthreadExec::exec_all_reduce_result()
371
return s_exec[0]->m_scratch_alloc ;
374
} /* namespace Impl */
375
} /* namespace Kokkos */
377
//----------------------------------------------------------------------------
379
#endif /* #if defined( KOKKOS_HAVE_QTHREAD ) */