1
/* ---------------------------------------------------------------------------
3
* (c) The GHC Team, 2003-2006
7
* A Capability represent the token required to execute STG code,
8
* and all the state an OS thread/task needs to run Haskell code:
9
* its STG registers, a pointer to its TSO, a nursery etc. During
10
* STG execution, a pointer to the capabilitity is kept in a
11
* register (BaseReg; actually it is a pointer to cap->r).
13
* Only in an THREADED_RTS build will there be multiple capabilities,
14
* for non-threaded builds there is only one global capability, namely
17
* --------------------------------------------------------------------------*/
19
#include "PosixSource.h"
22
#include "Capability.h"
26
#include "sm/GC.h" // for gcWorkerThread()
30
// one global capability, this is the Capability for non-threaded
31
// builds, and for +RTS -N1
32
Capability MainCapability;
34
nat n_capabilities = 0;
35
Capability *capabilities = NULL;
37
// Holds the Capability which last became free. This is used so that
38
// an in-call has a chance of quickly finding a free Capability.
39
// Maintaining a global free list of Capabilities would require global
40
// locking, so we don't do that.
41
Capability *last_free_capability = NULL;
43
/* GC indicator, in scope for the scheduler, init'ed to false */
44
volatile StgWord waiting_for_gc = 0;
46
/* Let foreign code get the current Capability -- assuming there is one!
47
* This is useful for unsafe foreign calls because they are called with
48
* the current Capability held, but they are not passed it. For example,
49
* see see the integer-gmp package which calls allocateLocal() in its
50
* stgAllocForGMP() function (which gets called by gmp functions).
52
Capability * rts_unsafeGetMyCapability (void)
54
#if defined(THREADED_RTS)
57
return &MainCapability;
61
#if defined(THREADED_RTS)
65
return sched_state >= SCHED_INTERRUPTING
66
|| recent_activity == ACTIVITY_INACTIVE; // need to check for deadlock
70
#if defined(THREADED_RTS)
72
findSpark (Capability *cap)
79
if (!emptyRunQueue(cap) || cap->returning_tasks_hd != NULL) {
80
// If there are other threads, don't try to run any new
81
// sparks: sparks might be speculative, we don't want to take
82
// resources away from the main computation.
89
// first try to get a spark from our own pool.
90
// We should be using reclaimSpark(), because it works without
91
// needing any atomic instructions:
92
// spark = reclaimSpark(cap->sparks);
93
// However, measurements show that this makes at least one benchmark
94
// slower (prsa) and doesn't affect the others.
95
spark = tryStealSpark(cap);
97
cap->sparks_converted++;
99
// Post event for running a spark from capability's own pool.
100
traceEventRunSpark(cap, cap->r.rCurrentTSO);
104
if (!emptySparkPoolCap(cap)) {
108
if (n_capabilities == 1) { return NULL; } // makes no sense...
110
debugTrace(DEBUG_sched,
111
"cap %d: Trying to steal work from other capabilities",
114
/* visit cap.s 0..n-1 in sequence until a theft succeeds. We could
115
start at a random place instead of 0 as well. */
116
for ( i=0 ; i < n_capabilities ; i++ ) {
117
robbed = &capabilities[i];
118
if (cap == robbed) // ourselves...
121
if (emptySparkPoolCap(robbed)) // nothing to steal here
124
spark = tryStealSpark(robbed);
125
if (spark == NULL && !emptySparkPoolCap(robbed)) {
126
// we conflicted with another thread while trying to steal;
132
cap->sparks_converted++;
134
traceEventStealSpark(cap, cap->r.rCurrentTSO, robbed->no);
138
// otherwise: no success, try next one
142
debugTrace(DEBUG_sched, "No sparks stolen");
146
// Returns True if any spark pool is non-empty at this moment in time
147
// The result is only valid for an instant, of course, so in a sense
148
// is immediately invalid, and should not be relied upon for
155
for (i=0; i < n_capabilities; i++) {
156
if (!emptySparkPoolCap(&capabilities[i])) {
164
/* -----------------------------------------------------------------------------
165
* Manage the returning_tasks lists.
167
* These functions require cap->lock
168
* -------------------------------------------------------------------------- */
170
#if defined(THREADED_RTS)
172
newReturningTask (Capability *cap, Task *task)
174
ASSERT_LOCK_HELD(&cap->lock);
175
ASSERT(task->next == NULL);
176
if (cap->returning_tasks_hd) {
177
ASSERT(cap->returning_tasks_tl->next == NULL);
178
cap->returning_tasks_tl->next = task;
180
cap->returning_tasks_hd = task;
182
cap->returning_tasks_tl = task;
186
popReturningTask (Capability *cap)
188
ASSERT_LOCK_HELD(&cap->lock);
190
task = cap->returning_tasks_hd;
192
cap->returning_tasks_hd = task->next;
193
if (!cap->returning_tasks_hd) {
194
cap->returning_tasks_tl = NULL;
201
/* ----------------------------------------------------------------------------
204
* The Capability is initially marked not free.
205
* ------------------------------------------------------------------------- */
208
initCapability( Capability *cap, nat i )
213
cap->in_haskell = rtsFalse;
215
cap->run_queue_hd = END_TSO_QUEUE;
216
cap->run_queue_tl = END_TSO_QUEUE;
218
#if defined(THREADED_RTS)
219
initMutex(&cap->lock);
220
cap->running_task = NULL; // indicates cap is free
221
cap->spare_workers = NULL;
222
cap->suspended_ccalls = NULL;
223
cap->returning_tasks_hd = NULL;
224
cap->returning_tasks_tl = NULL;
225
cap->inbox = (Message*)END_TSO_QUEUE;
226
cap->sparks_created = 0;
227
cap->sparks_converted = 0;
228
cap->sparks_pruned = 0;
231
cap->f.stgEagerBlackholeInfo = (W_)&__stg_EAGER_BLACKHOLE_info;
232
cap->f.stgGCEnter1 = (StgFunPtr)__stg_gc_enter_1;
233
cap->f.stgGCFun = (StgFunPtr)__stg_gc_fun;
235
cap->mut_lists = stgMallocBytes(sizeof(bdescr *) *
236
RtsFlags.GcFlags.generations,
238
cap->saved_mut_lists = stgMallocBytes(sizeof(bdescr *) *
239
RtsFlags.GcFlags.generations,
242
for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
243
cap->mut_lists[g] = NULL;
246
cap->free_tvar_watch_queues = END_STM_WATCH_QUEUE;
247
cap->free_invariant_check_queues = END_INVARIANT_CHECK_QUEUE;
248
cap->free_trec_chunks = END_STM_CHUNK_LIST;
249
cap->free_trec_headers = NO_TREC;
250
cap->transaction_tokens = 0;
251
cap->context_switch = 0;
252
cap->pinned_object_block = NULL;
255
/* ---------------------------------------------------------------------------
256
* Function: initCapabilities()
258
* Purpose: set up the Capability handling. For the THREADED_RTS build,
259
* we keep a table of them, the size of which is
260
* controlled by the user via the RTS flag -N.
262
* ------------------------------------------------------------------------- */
264
initCapabilities( void )
266
#if defined(THREADED_RTS)
270
// We can't support multiple CPUs if BaseReg is not a register
271
if (RtsFlags.ParFlags.nNodes > 1) {
272
errorBelch("warning: multiple CPUs not supported in this build, reverting to 1");
273
RtsFlags.ParFlags.nNodes = 1;
277
n_capabilities = RtsFlags.ParFlags.nNodes;
279
if (n_capabilities == 1) {
280
capabilities = &MainCapability;
281
// THREADED_RTS must work on builds that don't have a mutable
282
// BaseReg (eg. unregisterised), so in this case
283
// capabilities[0] must coincide with &MainCapability.
285
capabilities = stgMallocBytes(n_capabilities * sizeof(Capability),
289
for (i = 0; i < n_capabilities; i++) {
290
initCapability(&capabilities[i], i);
293
debugTrace(DEBUG_sched, "allocated %d capabilities", n_capabilities);
295
#else /* !THREADED_RTS */
298
capabilities = &MainCapability;
299
initCapability(&MainCapability, 0);
303
// There are no free capabilities to begin with. We will start
304
// a worker Task to each Capability, which will quickly put the
305
// Capability on the free list when it finds nothing to do.
306
last_free_capability = &capabilities[0];
309
/* ----------------------------------------------------------------------------
310
* setContextSwitches: cause all capabilities to context switch as
312
* ------------------------------------------------------------------------- */
314
void setContextSwitches(void)
317
for (i=0; i < n_capabilities; i++) {
318
contextSwitchCapability(&capabilities[i]);
322
/* ----------------------------------------------------------------------------
323
* Give a Capability to a Task. The task must currently be sleeping
324
* on its condition variable.
326
* Requires cap->lock (modifies cap->running_task).
328
* When migrating a Task, the migrater must take task->lock before
329
* modifying task->cap, to synchronise with the waking up Task.
330
* Additionally, the migrater should own the Capability (when
331
* migrating the run queue), or cap->lock (when migrating
332
* returning_workers).
334
* ------------------------------------------------------------------------- */
336
#if defined(THREADED_RTS)
338
giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task)
340
ASSERT_LOCK_HELD(&cap->lock);
341
ASSERT(task->cap == cap);
342
debugTrace(DEBUG_sched, "passing capability %d to %s %p",
343
cap->no, task->incall->tso ? "bound task" : "worker",
345
ACQUIRE_LOCK(&task->lock);
346
task->wakeup = rtsTrue;
347
// the wakeup flag is needed because signalCondition() doesn't
348
// flag the condition if the thread is already runniing, but we want
350
signalCondition(&task->cond);
351
RELEASE_LOCK(&task->lock);
355
/* ----------------------------------------------------------------------------
356
* Function: releaseCapability(Capability*)
358
* Purpose: Letting go of a capability. Causes a
359
* 'returning worker' thread or a 'waiting worker'
360
* to wake up, in that order.
361
* ------------------------------------------------------------------------- */
363
#if defined(THREADED_RTS)
365
releaseCapability_ (Capability* cap,
366
rtsBool always_wakeup)
370
task = cap->running_task;
372
ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task);
374
cap->running_task = NULL;
376
// Check to see whether a worker thread can be given
377
// the go-ahead to return the result of an external call..
378
if (cap->returning_tasks_hd != NULL) {
379
giveCapabilityToTask(cap,cap->returning_tasks_hd);
380
// The Task pops itself from the queue (see waitForReturnCapability())
384
if (waiting_for_gc == PENDING_GC_SEQ) {
385
last_free_capability = cap; // needed?
386
debugTrace(DEBUG_sched, "GC pending, set capability %d free", cap->no);
391
// If the next thread on the run queue is a bound thread,
392
// give this Capability to the appropriate Task.
393
if (!emptyRunQueue(cap) && cap->run_queue_hd->bound) {
394
// Make sure we're not about to try to wake ourselves up
395
// ASSERT(task != cap->run_queue_hd->bound);
396
// assertion is false: in schedule() we force a yield after
397
// ThreadBlocked, but the thread may be back on the run queue
399
task = cap->run_queue_hd->bound->task;
400
giveCapabilityToTask(cap,task);
404
if (!cap->spare_workers) {
405
// Create a worker thread if we don't have one. If the system
406
// is interrupted, we only create a worker task if there
407
// are threads that need to be completed. If the system is
408
// shutting down, we never create a new worker.
409
if (sched_state < SCHED_SHUTTING_DOWN || !emptyRunQueue(cap)) {
410
debugTrace(DEBUG_sched,
411
"starting new worker on capability %d", cap->no);
412
startWorkerTask(cap);
417
// If we have an unbound thread on the run queue, or if there's
418
// anything else to do, give the Capability to a worker thread.
420
!emptyRunQueue(cap) || !emptyInbox(cap) ||
421
!emptySparkPoolCap(cap) || globalWorkToDo()) {
422
if (cap->spare_workers) {
423
giveCapabilityToTask(cap,cap->spare_workers);
424
// The worker Task pops itself from the queue;
429
last_free_capability = cap;
430
debugTrace(DEBUG_sched, "freeing capability %d", cap->no);
434
releaseCapability (Capability* cap USED_IF_THREADS)
436
ACQUIRE_LOCK(&cap->lock);
437
releaseCapability_(cap, rtsFalse);
438
RELEASE_LOCK(&cap->lock);
442
releaseAndWakeupCapability (Capability* cap USED_IF_THREADS)
444
ACQUIRE_LOCK(&cap->lock);
445
releaseCapability_(cap, rtsTrue);
446
RELEASE_LOCK(&cap->lock);
450
releaseCapabilityAndQueueWorker (Capability* cap USED_IF_THREADS)
454
ACQUIRE_LOCK(&cap->lock);
456
task = cap->running_task;
458
// If the current task is a worker, save it on the spare_workers
459
// list of this Capability. A worker can mark itself as stopped,
460
// in which case it is not replaced on the spare_worker queue.
461
// This happens when the system is shutting down (see
462
// Schedule.c:workerStart()).
463
if (!isBoundTask(task) && !task->stopped) {
464
task->next = cap->spare_workers;
465
cap->spare_workers = task;
467
// Bound tasks just float around attached to their TSOs.
469
releaseCapability_(cap,rtsFalse);
471
RELEASE_LOCK(&cap->lock);
475
/* ----------------------------------------------------------------------------
476
* waitForReturnCapability( Task *task )
478
* Purpose: when an OS thread returns from an external call,
479
* it calls waitForReturnCapability() (via Schedule.resumeThread())
480
* to wait for permission to enter the RTS & communicate the
481
* result of the external call back to the Haskell thread that
484
* ------------------------------------------------------------------------- */
486
waitForReturnCapability (Capability **pCap, Task *task)
488
#if !defined(THREADED_RTS)
490
MainCapability.running_task = task;
491
task->cap = &MainCapability;
492
*pCap = &MainCapability;
495
Capability *cap = *pCap;
498
// Try last_free_capability first
499
cap = last_free_capability;
500
if (cap->running_task) {
502
// otherwise, search for a free capability
504
for (i = 0; i < n_capabilities; i++) {
505
if (!capabilities[i].running_task) {
506
cap = &capabilities[i];
511
// Can't find a free one, use last_free_capability.
512
cap = last_free_capability;
516
// record the Capability as the one this Task is now assocated with.
520
ASSERT(task->cap == cap);
523
ACQUIRE_LOCK(&cap->lock);
525
debugTrace(DEBUG_sched, "returning; I want capability %d", cap->no);
527
if (!cap->running_task) {
528
// It's free; just grab it
529
cap->running_task = task;
530
RELEASE_LOCK(&cap->lock);
532
newReturningTask(cap,task);
533
RELEASE_LOCK(&cap->lock);
536
ACQUIRE_LOCK(&task->lock);
537
// task->lock held, cap->lock not held
538
if (!task->wakeup) waitCondition(&task->cond, &task->lock);
540
task->wakeup = rtsFalse;
541
RELEASE_LOCK(&task->lock);
543
// now check whether we should wake up...
544
ACQUIRE_LOCK(&cap->lock);
545
if (cap->running_task == NULL) {
546
if (cap->returning_tasks_hd != task) {
547
giveCapabilityToTask(cap,cap->returning_tasks_hd);
548
RELEASE_LOCK(&cap->lock);
551
cap->running_task = task;
552
popReturningTask(cap);
553
RELEASE_LOCK(&cap->lock);
556
RELEASE_LOCK(&cap->lock);
561
ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
563
debugTrace(DEBUG_sched, "resuming capability %d", cap->no);
569
#if defined(THREADED_RTS)
570
/* ----------------------------------------------------------------------------
572
* ------------------------------------------------------------------------- */
575
yieldCapability (Capability** pCap, Task *task)
577
Capability *cap = *pCap;
579
if (waiting_for_gc == PENDING_GC_PAR) {
580
traceEventGcStart(cap);
582
traceEventGcEnd(cap);
586
debugTrace(DEBUG_sched, "giving up capability %d", cap->no);
588
// We must now release the capability and wait to be woken up
590
task->wakeup = rtsFalse;
591
releaseCapabilityAndQueueWorker(cap);
594
ACQUIRE_LOCK(&task->lock);
595
// task->lock held, cap->lock not held
596
if (!task->wakeup) waitCondition(&task->cond, &task->lock);
598
task->wakeup = rtsFalse;
599
RELEASE_LOCK(&task->lock);
601
debugTrace(DEBUG_sched, "woken up on capability %d", cap->no);
603
ACQUIRE_LOCK(&cap->lock);
604
if (cap->running_task != NULL) {
605
debugTrace(DEBUG_sched,
606
"capability %d is owned by another task", cap->no);
607
RELEASE_LOCK(&cap->lock);
611
if (task->incall->tso == NULL) {
612
ASSERT(cap->spare_workers != NULL);
613
// if we're not at the front of the queue, release it
614
// again. This is unlikely to happen.
615
if (cap->spare_workers != task) {
616
giveCapabilityToTask(cap,cap->spare_workers);
617
RELEASE_LOCK(&cap->lock);
620
cap->spare_workers = task->next;
623
cap->running_task = task;
624
RELEASE_LOCK(&cap->lock);
628
debugTrace(DEBUG_sched, "resuming capability %d", cap->no);
629
ASSERT(cap->running_task == task);
633
ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
638
/* ----------------------------------------------------------------------------
641
* If a Capability is currently idle, wake up a Task on it. Used to
642
* get every Capability into the GC.
643
* ------------------------------------------------------------------------- */
646
prodCapability (Capability *cap, Task *task)
648
ACQUIRE_LOCK(&cap->lock);
649
if (!cap->running_task) {
650
cap->running_task = task;
651
releaseCapability_(cap,rtsTrue);
653
RELEASE_LOCK(&cap->lock);
656
/* ----------------------------------------------------------------------------
659
* At shutdown time, we want to let everything exit as cleanly as
660
* possible. For each capability, we let its run queue drain, and
661
* allow the workers to stop.
663
* This function should be called when interrupted and
664
* shutting_down_scheduler = rtsTrue, thus any worker that wakes up
665
* will exit the scheduler and call taskStop(), and any bound thread
666
* that wakes up will return to its caller. Runnable threads are
669
* ------------------------------------------------------------------------- */
672
shutdownCapability (Capability *cap, Task *task, rtsBool safe)
678
// Loop indefinitely until all the workers have exited and there
679
// are no Haskell threads left. We used to bail out after 50
680
// iterations of this loop, but that occasionally left a worker
681
// running which caused problems later (the closeMutex() below
682
// isn't safe, for one thing).
684
for (i = 0; /* i < 50 */; i++) {
685
ASSERT(sched_state == SCHED_SHUTTING_DOWN);
687
debugTrace(DEBUG_sched,
688
"shutting down capability %d, attempt %d", cap->no, i);
689
ACQUIRE_LOCK(&cap->lock);
690
if (cap->running_task) {
691
RELEASE_LOCK(&cap->lock);
692
debugTrace(DEBUG_sched, "not owner, yielding");
696
cap->running_task = task;
698
if (cap->spare_workers) {
699
// Look for workers that have died without removing
700
// themselves from the list; this could happen if the OS
701
// summarily killed the thread, for example. This
702
// actually happens on Windows when the system is
703
// terminating the program, and the RTS is running in a
707
for (t = cap->spare_workers; t != NULL; t = t->next) {
708
if (!osThreadIsAlive(t->id)) {
709
debugTrace(DEBUG_sched,
710
"worker thread %p has died unexpectedly", (void *)t->id);
712
cap->spare_workers = t->next;
714
prev->next = t->next;
721
if (!emptyRunQueue(cap) || cap->spare_workers) {
722
debugTrace(DEBUG_sched,
723
"runnable threads or workers still alive, yielding");
724
releaseCapability_(cap,rtsFalse); // this will wake up a worker
725
RELEASE_LOCK(&cap->lock);
730
// If "safe", then busy-wait for any threads currently doing
731
// foreign calls. If we're about to unload this DLL, for
732
// example, we need to be sure that there are no OS threads
733
// that will try to return to code that has been unloaded.
734
// We can be a bit more relaxed when this is a standalone
735
// program that is about to terminate, and let safe=false.
736
if (cap->suspended_ccalls && safe) {
737
debugTrace(DEBUG_sched,
738
"thread(s) are involved in foreign calls, yielding");
739
cap->running_task = NULL;
740
RELEASE_LOCK(&cap->lock);
741
// The IO manager thread might have been slow to start up,
742
// so the first attempt to kill it might not have
743
// succeeded. Just in case, try again - the kill message
744
// will only be sent once.
746
// To reproduce this deadlock: run ffi002(threaded1)
747
// repeatedly on a loaded machine.
753
traceEventShutdown(cap);
754
RELEASE_LOCK(&cap->lock);
757
// we now have the Capability, its run queue and spare workers
758
// list are both empty.
760
// ToDo: we can't drop this mutex, because there might still be
761
// threads performing foreign calls that will eventually try to
762
// return via resumeThread() and attempt to grab cap->lock.
763
// closeMutex(&cap->lock);
766
/* ----------------------------------------------------------------------------
769
* Attempt to gain control of a Capability if it is free.
771
* ------------------------------------------------------------------------- */
774
tryGrabCapability (Capability *cap, Task *task)
776
if (cap->running_task != NULL) return rtsFalse;
777
ACQUIRE_LOCK(&cap->lock);
778
if (cap->running_task != NULL) {
779
RELEASE_LOCK(&cap->lock);
783
cap->running_task = task;
784
RELEASE_LOCK(&cap->lock);
789
#endif /* THREADED_RTS */
792
freeCapability (Capability *cap)
794
stgFree(cap->mut_lists);
795
stgFree(cap->saved_mut_lists);
796
#if defined(THREADED_RTS)
797
freeSparkPool(cap->sparks);
802
freeCapabilities (void)
804
#if defined(THREADED_RTS)
806
for (i=0; i < n_capabilities; i++) {
807
freeCapability(&capabilities[i]);
810
freeCapability(&MainCapability);
814
/* ---------------------------------------------------------------------------
815
Mark everything directly reachable from the Capabilities. When
816
using multiple GC threads, each GC thread marks all Capabilities
817
for which (c `mod` n == 0), for Capability c and thread n.
818
------------------------------------------------------------------------ */
821
markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta,
822
rtsBool no_mark_sparks USED_IF_THREADS)
828
// Each GC thread is responsible for following roots from the
829
// Capability of the same number. There will usually be the same
830
// or fewer Capabilities as GC threads, but just in case there
831
// are more, we mark every Capability whose number is the GC
832
// thread's index plus a multiple of the number of GC threads.
833
for (i = i0; i < n_capabilities; i += delta) {
834
cap = &capabilities[i];
835
evac(user, (StgClosure **)(void *)&cap->run_queue_hd);
836
evac(user, (StgClosure **)(void *)&cap->run_queue_tl);
837
#if defined(THREADED_RTS)
838
evac(user, (StgClosure **)(void *)&cap->inbox);
840
for (incall = cap->suspended_ccalls; incall != NULL;
841
incall=incall->next) {
842
evac(user, (StgClosure **)(void *)&incall->suspended_tso);
845
#if defined(THREADED_RTS)
846
if (!no_mark_sparks) {
847
traverseSparkQueue (evac, user, cap);
852
#if !defined(THREADED_RTS)
853
evac(user, (StgClosure **)(void *)&blocked_queue_hd);
854
evac(user, (StgClosure **)(void *)&blocked_queue_tl);
855
evac(user, (StgClosure **)(void *)&sleeping_queue);
860
markCapabilities (evac_fn evac, void *user)
862
markSomeCapabilities(evac, user, 0, 1, rtsFalse);
865
/* -----------------------------------------------------------------------------
867
-------------------------------------------------------------------------- */