~ubuntu-branches/ubuntu/precise/ghc/precise

« back to all changes in this revision

Viewing changes to rts/Capability.c

  • Committer: Bazaar Package Importer
  • Author(s): Joachim Breitner
  • Date: 2011-01-17 12:49:24 UTC
  • Revision ID: james.westby@ubuntu.com-20110117124924-do1pym1jlf5o636m
Tags: upstream-7.0.1
ImportĀ upstreamĀ versionĀ 7.0.1

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* ---------------------------------------------------------------------------
 
2
 *
 
3
 * (c) The GHC Team, 2003-2006
 
4
 *
 
5
 * Capabilities
 
6
 *
 
7
 * A Capability represent the token required to execute STG code,
 
8
 * and all the state an OS thread/task needs to run Haskell code:
 
9
 * its STG registers, a pointer to its TSO, a nursery etc. During
 
10
 * STG execution, a pointer to the capabilitity is kept in a
 
11
 * register (BaseReg; actually it is a pointer to cap->r).
 
12
 *
 
13
 * Only in an THREADED_RTS build will there be multiple capabilities,
 
14
 * for non-threaded builds there is only one global capability, namely
 
15
 * MainCapability.
 
16
 *
 
17
 * --------------------------------------------------------------------------*/
 
18
 
 
19
#include "PosixSource.h"
 
20
#include "Rts.h"
 
21
 
 
22
#include "Capability.h"
 
23
#include "Schedule.h"
 
24
#include "Sparks.h"
 
25
#include "Trace.h"
 
26
#include "sm/GC.h" // for gcWorkerThread()
 
27
#include "STM.h"
 
28
#include "RtsUtils.h"
 
29
 
 
30
// one global capability, this is the Capability for non-threaded
 
31
// builds, and for +RTS -N1
 
32
Capability MainCapability;
 
33
 
 
34
nat n_capabilities = 0;
 
35
Capability *capabilities = NULL;
 
36
 
 
37
// Holds the Capability which last became free.  This is used so that
 
38
// an in-call has a chance of quickly finding a free Capability.
 
39
// Maintaining a global free list of Capabilities would require global
 
40
// locking, so we don't do that.
 
41
Capability *last_free_capability = NULL;
 
42
 
 
43
/* GC indicator, in scope for the scheduler, init'ed to false */
 
44
volatile StgWord waiting_for_gc = 0;
 
45
 
 
46
/* Let foreign code get the current Capability -- assuming there is one!
 
47
 * This is useful for unsafe foreign calls because they are called with
 
48
 * the current Capability held, but they are not passed it. For example,
 
49
 * see see the integer-gmp package which calls allocateLocal() in its
 
50
 * stgAllocForGMP() function (which gets called by gmp functions).
 
51
 * */
 
52
Capability * rts_unsafeGetMyCapability (void)
 
53
{
 
54
#if defined(THREADED_RTS)
 
55
  return myTask()->cap;
 
56
#else
 
57
  return &MainCapability;
 
58
#endif
 
59
}
 
60
 
 
61
#if defined(THREADED_RTS)
 
62
STATIC_INLINE rtsBool
 
63
globalWorkToDo (void)
 
64
{
 
65
    return sched_state >= SCHED_INTERRUPTING
 
66
        || recent_activity == ACTIVITY_INACTIVE; // need to check for deadlock
 
67
}
 
68
#endif
 
69
 
 
70
#if defined(THREADED_RTS)
 
71
StgClosure *
 
72
findSpark (Capability *cap)
 
73
{
 
74
  Capability *robbed;
 
75
  StgClosurePtr spark;
 
76
  rtsBool retry;
 
77
  nat i = 0;
 
78
 
 
79
  if (!emptyRunQueue(cap) || cap->returning_tasks_hd != NULL) {
 
80
      // If there are other threads, don't try to run any new
 
81
      // sparks: sparks might be speculative, we don't want to take
 
82
      // resources away from the main computation.
 
83
      return 0;
 
84
  }
 
85
 
 
86
  do {
 
87
      retry = rtsFalse;
 
88
 
 
89
      // first try to get a spark from our own pool.
 
90
      // We should be using reclaimSpark(), because it works without
 
91
      // needing any atomic instructions:
 
92
      //   spark = reclaimSpark(cap->sparks);
 
93
      // However, measurements show that this makes at least one benchmark
 
94
      // slower (prsa) and doesn't affect the others.
 
95
      spark = tryStealSpark(cap);
 
96
      if (spark != NULL) {
 
97
          cap->sparks_converted++;
 
98
 
 
99
          // Post event for running a spark from capability's own pool.
 
100
          traceEventRunSpark(cap, cap->r.rCurrentTSO);
 
101
 
 
102
          return spark;
 
103
      }
 
104
      if (!emptySparkPoolCap(cap)) {
 
105
          retry = rtsTrue;
 
106
      }
 
107
 
 
108
      if (n_capabilities == 1) { return NULL; } // makes no sense...
 
109
 
 
110
      debugTrace(DEBUG_sched,
 
111
                 "cap %d: Trying to steal work from other capabilities", 
 
112
                 cap->no);
 
113
 
 
114
      /* visit cap.s 0..n-1 in sequence until a theft succeeds. We could
 
115
      start at a random place instead of 0 as well.  */
 
116
      for ( i=0 ; i < n_capabilities ; i++ ) {
 
117
          robbed = &capabilities[i];
 
118
          if (cap == robbed)  // ourselves...
 
119
              continue;
 
120
 
 
121
          if (emptySparkPoolCap(robbed)) // nothing to steal here
 
122
              continue;
 
123
 
 
124
          spark = tryStealSpark(robbed);
 
125
          if (spark == NULL && !emptySparkPoolCap(robbed)) {
 
126
              // we conflicted with another thread while trying to steal;
 
127
              // try again later.
 
128
              retry = rtsTrue;
 
129
          }
 
130
 
 
131
          if (spark != NULL) {
 
132
              cap->sparks_converted++;
 
133
 
 
134
              traceEventStealSpark(cap, cap->r.rCurrentTSO, robbed->no);
 
135
              
 
136
              return spark;
 
137
          }
 
138
          // otherwise: no success, try next one
 
139
      }
 
140
  } while (retry);
 
141
 
 
142
  debugTrace(DEBUG_sched, "No sparks stolen");
 
143
  return NULL;
 
144
}
 
145
 
 
146
// Returns True if any spark pool is non-empty at this moment in time
 
147
// The result is only valid for an instant, of course, so in a sense
 
148
// is immediately invalid, and should not be relied upon for
 
149
// correctness.
 
150
rtsBool
 
151
anySparks (void)
 
152
{
 
153
    nat i;
 
154
 
 
155
    for (i=0; i < n_capabilities; i++) {
 
156
        if (!emptySparkPoolCap(&capabilities[i])) {
 
157
            return rtsTrue;
 
158
        }
 
159
    }
 
160
    return rtsFalse;
 
161
}
 
162
#endif
 
163
 
 
164
/* -----------------------------------------------------------------------------
 
165
 * Manage the returning_tasks lists.
 
166
 *
 
167
 * These functions require cap->lock
 
168
 * -------------------------------------------------------------------------- */
 
169
 
 
170
#if defined(THREADED_RTS)
 
171
STATIC_INLINE void
 
172
newReturningTask (Capability *cap, Task *task)
 
173
{
 
174
    ASSERT_LOCK_HELD(&cap->lock);
 
175
    ASSERT(task->next == NULL);
 
176
    if (cap->returning_tasks_hd) {
 
177
        ASSERT(cap->returning_tasks_tl->next == NULL);
 
178
        cap->returning_tasks_tl->next = task;
 
179
    } else {
 
180
        cap->returning_tasks_hd = task;
 
181
    }
 
182
    cap->returning_tasks_tl = task;
 
183
}
 
184
 
 
185
STATIC_INLINE Task *
 
186
popReturningTask (Capability *cap)
 
187
{
 
188
    ASSERT_LOCK_HELD(&cap->lock);
 
189
    Task *task;
 
190
    task = cap->returning_tasks_hd;
 
191
    ASSERT(task);
 
192
    cap->returning_tasks_hd = task->next;
 
193
    if (!cap->returning_tasks_hd) {
 
194
        cap->returning_tasks_tl = NULL;
 
195
    }
 
196
    task->next = NULL;
 
197
    return task;
 
198
}
 
199
#endif
 
200
 
 
201
/* ----------------------------------------------------------------------------
 
202
 * Initialisation
 
203
 *
 
204
 * The Capability is initially marked not free.
 
205
 * ------------------------------------------------------------------------- */
 
206
 
 
207
static void
 
208
initCapability( Capability *cap, nat i )
 
209
{
 
210
    nat g;
 
211
 
 
212
    cap->no = i;
 
213
    cap->in_haskell        = rtsFalse;
 
214
 
 
215
    cap->run_queue_hd      = END_TSO_QUEUE;
 
216
    cap->run_queue_tl      = END_TSO_QUEUE;
 
217
 
 
218
#if defined(THREADED_RTS)
 
219
    initMutex(&cap->lock);
 
220
    cap->running_task      = NULL; // indicates cap is free
 
221
    cap->spare_workers     = NULL;
 
222
    cap->suspended_ccalls  = NULL;
 
223
    cap->returning_tasks_hd = NULL;
 
224
    cap->returning_tasks_tl = NULL;
 
225
    cap->inbox              = (Message*)END_TSO_QUEUE;
 
226
    cap->sparks_created     = 0;
 
227
    cap->sparks_converted   = 0;
 
228
    cap->sparks_pruned      = 0;
 
229
#endif
 
230
 
 
231
    cap->f.stgEagerBlackholeInfo = (W_)&__stg_EAGER_BLACKHOLE_info;
 
232
    cap->f.stgGCEnter1     = (StgFunPtr)__stg_gc_enter_1;
 
233
    cap->f.stgGCFun        = (StgFunPtr)__stg_gc_fun;
 
234
 
 
235
    cap->mut_lists  = stgMallocBytes(sizeof(bdescr *) *
 
236
                                     RtsFlags.GcFlags.generations,
 
237
                                     "initCapability");
 
238
    cap->saved_mut_lists = stgMallocBytes(sizeof(bdescr *) *
 
239
                                          RtsFlags.GcFlags.generations,
 
240
                                          "initCapability");
 
241
 
 
242
    for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
 
243
        cap->mut_lists[g] = NULL;
 
244
    }
 
245
 
 
246
    cap->free_tvar_watch_queues = END_STM_WATCH_QUEUE;
 
247
    cap->free_invariant_check_queues = END_INVARIANT_CHECK_QUEUE;
 
248
    cap->free_trec_chunks = END_STM_CHUNK_LIST;
 
249
    cap->free_trec_headers = NO_TREC;
 
250
    cap->transaction_tokens = 0;
 
251
    cap->context_switch = 0;
 
252
    cap->pinned_object_block = NULL;
 
253
}
 
254
 
 
255
/* ---------------------------------------------------------------------------
 
256
 * Function:  initCapabilities()
 
257
 *
 
258
 * Purpose:   set up the Capability handling. For the THREADED_RTS build,
 
259
 *            we keep a table of them, the size of which is
 
260
 *            controlled by the user via the RTS flag -N.
 
261
 *
 
262
 * ------------------------------------------------------------------------- */
 
263
void
 
264
initCapabilities( void )
 
265
{
 
266
#if defined(THREADED_RTS)
 
267
    nat i;
 
268
 
 
269
#ifndef REG_Base
 
270
    // We can't support multiple CPUs if BaseReg is not a register
 
271
    if (RtsFlags.ParFlags.nNodes > 1) {
 
272
        errorBelch("warning: multiple CPUs not supported in this build, reverting to 1");
 
273
        RtsFlags.ParFlags.nNodes = 1;
 
274
    }
 
275
#endif
 
276
 
 
277
    n_capabilities = RtsFlags.ParFlags.nNodes;
 
278
 
 
279
    if (n_capabilities == 1) {
 
280
        capabilities = &MainCapability;
 
281
        // THREADED_RTS must work on builds that don't have a mutable
 
282
        // BaseReg (eg. unregisterised), so in this case
 
283
        // capabilities[0] must coincide with &MainCapability.
 
284
    } else {
 
285
        capabilities = stgMallocBytes(n_capabilities * sizeof(Capability),
 
286
                                      "initCapabilities");
 
287
    }
 
288
 
 
289
    for (i = 0; i < n_capabilities; i++) {
 
290
        initCapability(&capabilities[i], i);
 
291
    }
 
292
 
 
293
    debugTrace(DEBUG_sched, "allocated %d capabilities", n_capabilities);
 
294
 
 
295
#else /* !THREADED_RTS */
 
296
 
 
297
    n_capabilities = 1;
 
298
    capabilities = &MainCapability;
 
299
    initCapability(&MainCapability, 0);
 
300
 
 
301
#endif
 
302
 
 
303
    // There are no free capabilities to begin with.  We will start
 
304
    // a worker Task to each Capability, which will quickly put the
 
305
    // Capability on the free list when it finds nothing to do.
 
306
    last_free_capability = &capabilities[0];
 
307
}
 
308
 
 
309
/* ----------------------------------------------------------------------------
 
310
 * setContextSwitches: cause all capabilities to context switch as
 
311
 * soon as possible.
 
312
 * ------------------------------------------------------------------------- */
 
313
 
 
314
void setContextSwitches(void)
 
315
{
 
316
    nat i;
 
317
    for (i=0; i < n_capabilities; i++) {
 
318
        contextSwitchCapability(&capabilities[i]);
 
319
    }
 
320
}
 
321
 
 
322
/* ----------------------------------------------------------------------------
 
323
 * Give a Capability to a Task.  The task must currently be sleeping
 
324
 * on its condition variable.
 
325
 *
 
326
 * Requires cap->lock (modifies cap->running_task).
 
327
 *
 
328
 * When migrating a Task, the migrater must take task->lock before
 
329
 * modifying task->cap, to synchronise with the waking up Task.
 
330
 * Additionally, the migrater should own the Capability (when
 
331
 * migrating the run queue), or cap->lock (when migrating
 
332
 * returning_workers).
 
333
 *
 
334
 * ------------------------------------------------------------------------- */
 
335
 
 
336
#if defined(THREADED_RTS)
 
337
STATIC_INLINE void
 
338
giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task)
 
339
{
 
340
    ASSERT_LOCK_HELD(&cap->lock);
 
341
    ASSERT(task->cap == cap);
 
342
    debugTrace(DEBUG_sched, "passing capability %d to %s %p",
 
343
               cap->no, task->incall->tso ? "bound task" : "worker",
 
344
               (void *)task->id);
 
345
    ACQUIRE_LOCK(&task->lock);
 
346
    task->wakeup = rtsTrue;
 
347
    // the wakeup flag is needed because signalCondition() doesn't
 
348
    // flag the condition if the thread is already runniing, but we want
 
349
    // it to be sticky.
 
350
    signalCondition(&task->cond);
 
351
    RELEASE_LOCK(&task->lock);
 
352
}
 
353
#endif
 
354
 
 
355
/* ----------------------------------------------------------------------------
 
356
 * Function:  releaseCapability(Capability*)
 
357
 *
 
358
 * Purpose:   Letting go of a capability. Causes a
 
359
 *            'returning worker' thread or a 'waiting worker'
 
360
 *            to wake up, in that order.
 
361
 * ------------------------------------------------------------------------- */
 
362
 
 
363
#if defined(THREADED_RTS)
 
364
void
 
365
releaseCapability_ (Capability* cap, 
 
366
                    rtsBool always_wakeup)
 
367
{
 
368
    Task *task;
 
369
 
 
370
    task = cap->running_task;
 
371
 
 
372
    ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task);
 
373
 
 
374
    cap->running_task = NULL;
 
375
 
 
376
    // Check to see whether a worker thread can be given
 
377
    // the go-ahead to return the result of an external call..
 
378
    if (cap->returning_tasks_hd != NULL) {
 
379
        giveCapabilityToTask(cap,cap->returning_tasks_hd);
 
380
        // The Task pops itself from the queue (see waitForReturnCapability())
 
381
        return;
 
382
    }
 
383
 
 
384
    if (waiting_for_gc == PENDING_GC_SEQ) {
 
385
      last_free_capability = cap; // needed?
 
386
      debugTrace(DEBUG_sched, "GC pending, set capability %d free", cap->no);
 
387
      return;
 
388
    } 
 
389
 
 
390
 
 
391
    // If the next thread on the run queue is a bound thread,
 
392
    // give this Capability to the appropriate Task.
 
393
    if (!emptyRunQueue(cap) && cap->run_queue_hd->bound) {
 
394
        // Make sure we're not about to try to wake ourselves up
 
395
        // ASSERT(task != cap->run_queue_hd->bound);
 
396
        // assertion is false: in schedule() we force a yield after
 
397
        // ThreadBlocked, but the thread may be back on the run queue
 
398
        // by now.
 
399
        task = cap->run_queue_hd->bound->task;
 
400
        giveCapabilityToTask(cap,task);
 
401
        return;
 
402
    }
 
403
 
 
404
    if (!cap->spare_workers) {
 
405
        // Create a worker thread if we don't have one.  If the system
 
406
        // is interrupted, we only create a worker task if there
 
407
        // are threads that need to be completed.  If the system is
 
408
        // shutting down, we never create a new worker.
 
409
        if (sched_state < SCHED_SHUTTING_DOWN || !emptyRunQueue(cap)) {
 
410
            debugTrace(DEBUG_sched,
 
411
                       "starting new worker on capability %d", cap->no);
 
412
            startWorkerTask(cap);
 
413
            return;
 
414
        }
 
415
    }
 
416
 
 
417
    // If we have an unbound thread on the run queue, or if there's
 
418
    // anything else to do, give the Capability to a worker thread.
 
419
    if (always_wakeup || 
 
420
        !emptyRunQueue(cap) || !emptyInbox(cap) ||
 
421
        !emptySparkPoolCap(cap) || globalWorkToDo()) {
 
422
        if (cap->spare_workers) {
 
423
            giveCapabilityToTask(cap,cap->spare_workers);
 
424
            // The worker Task pops itself from the queue;
 
425
            return;
 
426
        }
 
427
    }
 
428
 
 
429
    last_free_capability = cap;
 
430
    debugTrace(DEBUG_sched, "freeing capability %d", cap->no);
 
431
}
 
432
 
 
433
void
 
434
releaseCapability (Capability* cap USED_IF_THREADS)
 
435
{
 
436
    ACQUIRE_LOCK(&cap->lock);
 
437
    releaseCapability_(cap, rtsFalse);
 
438
    RELEASE_LOCK(&cap->lock);
 
439
}
 
440
 
 
441
void
 
442
releaseAndWakeupCapability (Capability* cap USED_IF_THREADS)
 
443
{
 
444
    ACQUIRE_LOCK(&cap->lock);
 
445
    releaseCapability_(cap, rtsTrue);
 
446
    RELEASE_LOCK(&cap->lock);
 
447
}
 
448
 
 
449
static void
 
450
releaseCapabilityAndQueueWorker (Capability* cap USED_IF_THREADS)
 
451
{
 
452
    Task *task;
 
453
 
 
454
    ACQUIRE_LOCK(&cap->lock);
 
455
 
 
456
    task = cap->running_task;
 
457
 
 
458
    // If the current task is a worker, save it on the spare_workers
 
459
    // list of this Capability.  A worker can mark itself as stopped,
 
460
    // in which case it is not replaced on the spare_worker queue.
 
461
    // This happens when the system is shutting down (see
 
462
    // Schedule.c:workerStart()).
 
463
    if (!isBoundTask(task) && !task->stopped) {
 
464
        task->next = cap->spare_workers;
 
465
        cap->spare_workers = task;
 
466
    }
 
467
    // Bound tasks just float around attached to their TSOs.
 
468
 
 
469
    releaseCapability_(cap,rtsFalse);
 
470
 
 
471
    RELEASE_LOCK(&cap->lock);
 
472
}
 
473
#endif
 
474
 
 
475
/* ----------------------------------------------------------------------------
 
476
 * waitForReturnCapability( Task *task )
 
477
 *
 
478
 * Purpose:  when an OS thread returns from an external call,
 
479
 * it calls waitForReturnCapability() (via Schedule.resumeThread())
 
480
 * to wait for permission to enter the RTS & communicate the
 
481
 * result of the external call back to the Haskell thread that
 
482
 * made it.
 
483
 *
 
484
 * ------------------------------------------------------------------------- */
 
485
void
 
486
waitForReturnCapability (Capability **pCap, Task *task)
 
487
{
 
488
#if !defined(THREADED_RTS)
 
489
 
 
490
    MainCapability.running_task = task;
 
491
    task->cap = &MainCapability;
 
492
    *pCap = &MainCapability;
 
493
 
 
494
#else
 
495
    Capability *cap = *pCap;
 
496
 
 
497
    if (cap == NULL) {
 
498
        // Try last_free_capability first
 
499
        cap = last_free_capability;
 
500
        if (cap->running_task) {
 
501
            nat i;
 
502
            // otherwise, search for a free capability
 
503
            cap = NULL;
 
504
            for (i = 0; i < n_capabilities; i++) {
 
505
                if (!capabilities[i].running_task) {
 
506
                    cap = &capabilities[i];
 
507
                    break;
 
508
                }
 
509
            }
 
510
            if (cap == NULL) {
 
511
                // Can't find a free one, use last_free_capability.
 
512
                cap = last_free_capability;
 
513
            }
 
514
        }
 
515
 
 
516
        // record the Capability as the one this Task is now assocated with.
 
517
        task->cap = cap;
 
518
 
 
519
    } else {
 
520
        ASSERT(task->cap == cap);
 
521
    }
 
522
 
 
523
    ACQUIRE_LOCK(&cap->lock);
 
524
 
 
525
    debugTrace(DEBUG_sched, "returning; I want capability %d", cap->no);
 
526
 
 
527
    if (!cap->running_task) {
 
528
        // It's free; just grab it
 
529
        cap->running_task = task;
 
530
        RELEASE_LOCK(&cap->lock);
 
531
    } else {
 
532
        newReturningTask(cap,task);
 
533
        RELEASE_LOCK(&cap->lock);
 
534
 
 
535
        for (;;) {
 
536
            ACQUIRE_LOCK(&task->lock);
 
537
            // task->lock held, cap->lock not held
 
538
            if (!task->wakeup) waitCondition(&task->cond, &task->lock);
 
539
            cap = task->cap;
 
540
            task->wakeup = rtsFalse;
 
541
            RELEASE_LOCK(&task->lock);
 
542
 
 
543
            // now check whether we should wake up...
 
544
            ACQUIRE_LOCK(&cap->lock);
 
545
            if (cap->running_task == NULL) {
 
546
                if (cap->returning_tasks_hd != task) {
 
547
                    giveCapabilityToTask(cap,cap->returning_tasks_hd);
 
548
                    RELEASE_LOCK(&cap->lock);
 
549
                    continue;
 
550
                }
 
551
                cap->running_task = task;
 
552
                popReturningTask(cap);
 
553
                RELEASE_LOCK(&cap->lock);
 
554
                break;
 
555
            }
 
556
            RELEASE_LOCK(&cap->lock);
 
557
        }
 
558
 
 
559
    }
 
560
 
 
561
    ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
 
562
 
 
563
    debugTrace(DEBUG_sched, "resuming capability %d", cap->no);
 
564
 
 
565
    *pCap = cap;
 
566
#endif
 
567
}
 
568
 
 
569
#if defined(THREADED_RTS)
 
570
/* ----------------------------------------------------------------------------
 
571
 * yieldCapability
 
572
 * ------------------------------------------------------------------------- */
 
573
 
 
574
void
 
575
yieldCapability (Capability** pCap, Task *task)
 
576
{
 
577
    Capability *cap = *pCap;
 
578
 
 
579
    if (waiting_for_gc == PENDING_GC_PAR) {
 
580
        traceEventGcStart(cap);
 
581
        gcWorkerThread(cap);
 
582
        traceEventGcEnd(cap);
 
583
        return;
 
584
    }
 
585
 
 
586
        debugTrace(DEBUG_sched, "giving up capability %d", cap->no);
 
587
 
 
588
        // We must now release the capability and wait to be woken up
 
589
        // again.
 
590
        task->wakeup = rtsFalse;
 
591
        releaseCapabilityAndQueueWorker(cap);
 
592
 
 
593
        for (;;) {
 
594
            ACQUIRE_LOCK(&task->lock);
 
595
            // task->lock held, cap->lock not held
 
596
            if (!task->wakeup) waitCondition(&task->cond, &task->lock);
 
597
            cap = task->cap;
 
598
            task->wakeup = rtsFalse;
 
599
            RELEASE_LOCK(&task->lock);
 
600
 
 
601
            debugTrace(DEBUG_sched, "woken up on capability %d", cap->no);
 
602
 
 
603
            ACQUIRE_LOCK(&cap->lock);
 
604
            if (cap->running_task != NULL) {
 
605
                debugTrace(DEBUG_sched, 
 
606
                           "capability %d is owned by another task", cap->no);
 
607
                RELEASE_LOCK(&cap->lock);
 
608
                continue;
 
609
            }
 
610
 
 
611
            if (task->incall->tso == NULL) {
 
612
                ASSERT(cap->spare_workers != NULL);
 
613
                // if we're not at the front of the queue, release it
 
614
                // again.  This is unlikely to happen.
 
615
                if (cap->spare_workers != task) {
 
616
                    giveCapabilityToTask(cap,cap->spare_workers);
 
617
                    RELEASE_LOCK(&cap->lock);
 
618
                    continue;
 
619
                }
 
620
                cap->spare_workers = task->next;
 
621
                task->next = NULL;
 
622
            }
 
623
            cap->running_task = task;
 
624
            RELEASE_LOCK(&cap->lock);
 
625
            break;
 
626
        }
 
627
 
 
628
        debugTrace(DEBUG_sched, "resuming capability %d", cap->no);
 
629
        ASSERT(cap->running_task == task);
 
630
 
 
631
    *pCap = cap;
 
632
 
 
633
    ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
 
634
 
 
635
    return;
 
636
}
 
637
 
 
638
/* ----------------------------------------------------------------------------
 
639
 * prodCapability
 
640
 *
 
641
 * If a Capability is currently idle, wake up a Task on it.  Used to 
 
642
 * get every Capability into the GC.
 
643
 * ------------------------------------------------------------------------- */
 
644
 
 
645
void
 
646
prodCapability (Capability *cap, Task *task)
 
647
{
 
648
    ACQUIRE_LOCK(&cap->lock);
 
649
    if (!cap->running_task) {
 
650
        cap->running_task = task;
 
651
        releaseCapability_(cap,rtsTrue);
 
652
    }
 
653
    RELEASE_LOCK(&cap->lock);
 
654
}
 
655
 
 
656
/* ----------------------------------------------------------------------------
 
657
 * shutdownCapability
 
658
 *
 
659
 * At shutdown time, we want to let everything exit as cleanly as
 
660
 * possible.  For each capability, we let its run queue drain, and
 
661
 * allow the workers to stop.
 
662
 *
 
663
 * This function should be called when interrupted and
 
664
 * shutting_down_scheduler = rtsTrue, thus any worker that wakes up
 
665
 * will exit the scheduler and call taskStop(), and any bound thread
 
666
 * that wakes up will return to its caller.  Runnable threads are
 
667
 * killed.
 
668
 *
 
669
 * ------------------------------------------------------------------------- */
 
670
 
 
671
void
 
672
shutdownCapability (Capability *cap, Task *task, rtsBool safe)
 
673
{
 
674
    nat i;
 
675
 
 
676
    task->cap = cap;
 
677
 
 
678
    // Loop indefinitely until all the workers have exited and there
 
679
    // are no Haskell threads left.  We used to bail out after 50
 
680
    // iterations of this loop, but that occasionally left a worker
 
681
    // running which caused problems later (the closeMutex() below
 
682
    // isn't safe, for one thing).
 
683
 
 
684
    for (i = 0; /* i < 50 */; i++) {
 
685
        ASSERT(sched_state == SCHED_SHUTTING_DOWN);
 
686
 
 
687
        debugTrace(DEBUG_sched, 
 
688
                   "shutting down capability %d, attempt %d", cap->no, i);
 
689
        ACQUIRE_LOCK(&cap->lock);
 
690
        if (cap->running_task) {
 
691
            RELEASE_LOCK(&cap->lock);
 
692
            debugTrace(DEBUG_sched, "not owner, yielding");
 
693
            yieldThread();
 
694
            continue;
 
695
        }
 
696
        cap->running_task = task;
 
697
 
 
698
        if (cap->spare_workers) {
 
699
            // Look for workers that have died without removing
 
700
            // themselves from the list; this could happen if the OS
 
701
            // summarily killed the thread, for example.  This
 
702
            // actually happens on Windows when the system is
 
703
            // terminating the program, and the RTS is running in a
 
704
            // DLL.
 
705
            Task *t, *prev;
 
706
            prev = NULL;
 
707
            for (t = cap->spare_workers; t != NULL; t = t->next) {
 
708
                if (!osThreadIsAlive(t->id)) {
 
709
                    debugTrace(DEBUG_sched, 
 
710
                               "worker thread %p has died unexpectedly", (void *)t->id);
 
711
                        if (!prev) {
 
712
                            cap->spare_workers = t->next;
 
713
                        } else {
 
714
                            prev->next = t->next;
 
715
                        }
 
716
                        prev = t;
 
717
                }
 
718
            }
 
719
        }
 
720
 
 
721
        if (!emptyRunQueue(cap) || cap->spare_workers) {
 
722
            debugTrace(DEBUG_sched, 
 
723
                       "runnable threads or workers still alive, yielding");
 
724
            releaseCapability_(cap,rtsFalse); // this will wake up a worker
 
725
            RELEASE_LOCK(&cap->lock);
 
726
            yieldThread();
 
727
            continue;
 
728
        }
 
729
 
 
730
        // If "safe", then busy-wait for any threads currently doing
 
731
        // foreign calls.  If we're about to unload this DLL, for
 
732
        // example, we need to be sure that there are no OS threads
 
733
        // that will try to return to code that has been unloaded.
 
734
        // We can be a bit more relaxed when this is a standalone
 
735
        // program that is about to terminate, and let safe=false.
 
736
        if (cap->suspended_ccalls && safe) {
 
737
            debugTrace(DEBUG_sched, 
 
738
                       "thread(s) are involved in foreign calls, yielding");
 
739
            cap->running_task = NULL;
 
740
            RELEASE_LOCK(&cap->lock);
 
741
            // The IO manager thread might have been slow to start up,
 
742
            // so the first attempt to kill it might not have
 
743
            // succeeded.  Just in case, try again - the kill message
 
744
            // will only be sent once.
 
745
            //
 
746
            // To reproduce this deadlock: run ffi002(threaded1)
 
747
            // repeatedly on a loaded machine.
 
748
            ioManagerDie();
 
749
            yieldThread();
 
750
            continue;
 
751
        }
 
752
 
 
753
        traceEventShutdown(cap);
 
754
        RELEASE_LOCK(&cap->lock);
 
755
        break;
 
756
    }
 
757
    // we now have the Capability, its run queue and spare workers
 
758
    // list are both empty.
 
759
 
 
760
    // ToDo: we can't drop this mutex, because there might still be
 
761
    // threads performing foreign calls that will eventually try to 
 
762
    // return via resumeThread() and attempt to grab cap->lock.
 
763
    // closeMutex(&cap->lock);
 
764
}
 
765
 
 
766
/* ----------------------------------------------------------------------------
 
767
 * tryGrabCapability
 
768
 *
 
769
 * Attempt to gain control of a Capability if it is free.
 
770
 *
 
771
 * ------------------------------------------------------------------------- */
 
772
 
 
773
rtsBool
 
774
tryGrabCapability (Capability *cap, Task *task)
 
775
{
 
776
    if (cap->running_task != NULL) return rtsFalse;
 
777
    ACQUIRE_LOCK(&cap->lock);
 
778
    if (cap->running_task != NULL) {
 
779
        RELEASE_LOCK(&cap->lock);
 
780
        return rtsFalse;
 
781
    }
 
782
    task->cap = cap;
 
783
    cap->running_task = task;
 
784
    RELEASE_LOCK(&cap->lock);
 
785
    return rtsTrue;
 
786
}
 
787
 
 
788
 
 
789
#endif /* THREADED_RTS */
 
790
 
 
791
static void
 
792
freeCapability (Capability *cap)
 
793
{
 
794
    stgFree(cap->mut_lists);
 
795
    stgFree(cap->saved_mut_lists);
 
796
#if defined(THREADED_RTS)
 
797
    freeSparkPool(cap->sparks);
 
798
#endif
 
799
}
 
800
 
 
801
void
 
802
freeCapabilities (void)
 
803
{
 
804
#if defined(THREADED_RTS)
 
805
    nat i;
 
806
    for (i=0; i < n_capabilities; i++) {
 
807
        freeCapability(&capabilities[i]);
 
808
    }
 
809
#else
 
810
    freeCapability(&MainCapability);
 
811
#endif
 
812
}
 
813
 
 
814
/* ---------------------------------------------------------------------------
 
815
   Mark everything directly reachable from the Capabilities.  When
 
816
   using multiple GC threads, each GC thread marks all Capabilities
 
817
   for which (c `mod` n == 0), for Capability c and thread n.
 
818
   ------------------------------------------------------------------------ */
 
819
 
 
820
void
 
821
markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta, 
 
822
                      rtsBool no_mark_sparks USED_IF_THREADS)
 
823
{
 
824
    nat i;
 
825
    Capability *cap;
 
826
    InCall *incall;
 
827
 
 
828
    // Each GC thread is responsible for following roots from the
 
829
    // Capability of the same number.  There will usually be the same
 
830
    // or fewer Capabilities as GC threads, but just in case there
 
831
    // are more, we mark every Capability whose number is the GC
 
832
    // thread's index plus a multiple of the number of GC threads.
 
833
    for (i = i0; i < n_capabilities; i += delta) {
 
834
        cap = &capabilities[i];
 
835
        evac(user, (StgClosure **)(void *)&cap->run_queue_hd);
 
836
        evac(user, (StgClosure **)(void *)&cap->run_queue_tl);
 
837
#if defined(THREADED_RTS)
 
838
        evac(user, (StgClosure **)(void *)&cap->inbox);
 
839
#endif
 
840
        for (incall = cap->suspended_ccalls; incall != NULL; 
 
841
             incall=incall->next) {
 
842
            evac(user, (StgClosure **)(void *)&incall->suspended_tso);
 
843
        }
 
844
 
 
845
#if defined(THREADED_RTS)
 
846
        if (!no_mark_sparks) {
 
847
            traverseSparkQueue (evac, user, cap);
 
848
        }
 
849
#endif
 
850
    }
 
851
 
 
852
#if !defined(THREADED_RTS)
 
853
    evac(user, (StgClosure **)(void *)&blocked_queue_hd);
 
854
    evac(user, (StgClosure **)(void *)&blocked_queue_tl);
 
855
    evac(user, (StgClosure **)(void *)&sleeping_queue);
 
856
#endif 
 
857
}
 
858
 
 
859
void
 
860
markCapabilities (evac_fn evac, void *user)
 
861
{
 
862
    markSomeCapabilities(evac, user, 0, 1, rtsFalse);
 
863
}
 
864
 
 
865
/* -----------------------------------------------------------------------------
 
866
   Messages
 
867
   -------------------------------------------------------------------------- */
 
868