~martin-decky/helenos/rcu

« back to all changes in this revision

Viewing changes to kernel/generic/src/synch/rcu.c

  • Committer: Martin Decky
  • Date: 2012-06-20 18:07:24 UTC
  • Revision ID: martin@decky.cz-20120620180724-vhctn1c30vh1qayh
adopt RCU for HelenOS specifics (no CPUs in motion)

Show diffs side-by-side

added added

removed removed

Lines of Context:
272
272
 
273
273
static void rcu_advance_callbacks(void)
274
274
{
275
 
        rcu_t *list = CPU->rcu.curlist;
276
 
        
277
 
        if (list) {
278
 
                rcu_t *next;
279
 
                rcu_t *failed_list = NULL;
280
 
                rcu_t **failed_end = &failed_list;
281
 
                atomic_count_t weight_sum = 0;
282
 
                
283
 
                /*
284
 
                 * The following volatile cast makes sure that the workq
285
 
                 * variable really exists. If it did not exist, the compiler
286
 
                 * could (re-)read it from rcu_cb.workq on each access.
287
 
                 * This would break the "enqueued in order" promise, since
288
 
                 * callbacks could exist in the old queue and a new one
289
 
                 * at the same time.
290
 
                 */
291
 
                workq_t *workq = RCU_VOLATILE(workq_t *, rcu_cb.workq);
292
 
                
293
 
                /*
294
 
                 * This is where the nextlist from last grace period is
295
 
                 * actually terminated.
296
 
                 */
297
 
                *CPU->rcu.curtail = NULL;
298
 
                do {
299
 
                        /* Grab the 'next' pointer and weight... */
300
 
                        next = list->next;
301
 
                        weight_sum += atomic_get(&list->weight);
302
 
                        
303
 
                        switch (list->type) {
304
 
                        case RCU_CB_DEFAULT:
305
 
                                list->func(list);
306
 
                                break;
307
 
                        case RCU_CB_EXCL:
308
 
                                /*
309
 
                                 * This may fail under heavy load and/or memory
310
 
                                 * pressure. However, we do not want to sleep,
311
 
                                 * so that all the non-exclusive callbacks are
312
 
                                 * processed as soon as possible.
313
 
                                 */
314
 
                                if (!workq_dispatch(workq, (workq_fn_t) list->func,
315
 
                                    list, WORKQ_FLAG_NOQUEUE)) {
316
 
                                        *failed_end = list;
317
 
                                        failed_end = &list->next;
318
 
                                        list = next;
319
 
                                        goto failure;
320
 
                                }
321
 
                                
322
 
                                break;
323
 
                        }
324
 
                } while ((list = next) != NULL);
325
 
                
326
 
                /*
327
 
                 * Reaching this place means that nothing failed above.
328
 
                 * No need to handle failures.
329
 
                 */
330
 
                goto out;
331
 
                
332
 
                /*
333
 
                 * The following loop executes iff a failure occurred above.
334
 
                 * We do not attempt to enqueue the exclusive callbacks after
335
 
                 * the first failure, since it could mess up their order.
336
 
                 * Instead we just handle the standard callbacks expediently
337
 
                 * and sleep waiting for the queue later on, with the correct
338
 
                 * order of the exclusive callbacks.
339
 
                 */
340
 
failure:
341
 
                for (; list; list = next) {
342
 
                        next = list->next;
343
 
                        weight_sum += atomic_get(&list->weight);
344
 
                        
345
 
                        switch (list->type) {
346
 
                        case RCU_CB_DEFAULT:
347
 
                                list->func(list);
348
 
                                break;
349
 
                        case RCU_CB_EXCL:
350
 
                                *failed_end = list;
351
 
                                failed_end = &list->next;
352
 
                                break;
353
 
                        }
 
275
        /*
 
276
         * The following volatile cast makes sure that the workq
 
277
         * variable really exists. If it did not exist, the compiler
 
278
         * could (re-)read it from rcu_cb.workq on each access.
 
279
         * This would break the "enqueued in order" promise, since
 
280
         * callbacks could exist in the old queue and a new one
 
281
         * at the same time.
 
282
         */
 
283
        // FIXME is this necessary if no CPU is in motion?
 
284
        workq_t *workq = RCU_VOLATILE(workq_t *, rcu_cb.workq);
 
285
        
 
286
        list_t *list = &CPU->rcu.curlist;
 
287
        atomic_count_t weight_sum = 0;
 
288
        
 
289
        while (!list_empty(list)) {
 
290
                rcu_t *item =
 
291
                    list_get_instance(list_first(list), rcu_t, link);
 
292
                
 
293
                ASSERT(item);
 
294
                
 
295
                // FIXME is it OK to sum the weight if we can fail?
 
296
                weight_sum += item->weight;
 
297
                
 
298
                switch (item->type) {
 
299
                case RCU_CB_DEFAULT:
 
300
                        item->func(item);
 
301
                        break;
 
302
                case RCU_CB_EXCL:
 
303
                        /*
 
304
                         * This may fail under heavy load and/or memory
 
305
                         * pressure. However, we do not want to sleep,
 
306
                         * so that all the non-exclusive callbacks are
 
307
                         * processed as soon as possible.
 
308
                         */
 
309
                        if (!workq_dispatch(workq, (workq_fn_t) item->func,
 
310
                            item, WORKQ_FLAG_NOQUEUE))
 
311
                                goto fail;
 
312
                        
 
313
                        break;
354
314
                }
355
315
                
356
 
                /*
357
 
                 * If we could not enqueue some callbacks above, retry it now,
358
 
                 * this time waiting as long as needed. We know for sure that
359
 
                 * failed_list is not null when we get here. Note that all the
360
 
                 * weight sum has already been computed above.
361
 
                 */
362
 
                *failed_end = NULL;
363
 
                
364
 
                do {
365
 
                        next = failed_list->next;
366
 
                        (void) workq_dispatch(workq, (workq_fn_t) failed_list->func,
367
 
                            failed_list, 0);
368
 
                } while ((failed_list = next) != NULL);
369
 
                
370
 
                /*
371
 
                 * Subtract the weight atomically.
372
 
                 */
373
 
out:
374
 
                atomic_postsub(&CPU->rcu.weight, weight_sum);
375
 
        }
376
 
        
 
316
                list_remove(&item->link);
 
317
        }
 
318
        
 
319
fail:
 
320
        /*
 
321
         * The following loops execute iff a failure occurred above.
 
322
         * We do not attempt to enqueue the exclusive callbacks after
 
323
         * the first failure, since it could mess up their order.
 
324
         * Instead we just handle the standard callbacks expediently
 
325
         * and sleep waiting for the queue later on, with the correct
 
326
         * order of the exclusive callbacks.
 
327
         */
 
328
        list_foreach(*list, link) {
 
329
                rcu_t *item = list_get_instance(link, rcu_t, link);
 
330
                
 
331
                ASSERT(item);
 
332
                
 
333
                weight_sum += item->weight;
 
334
                
 
335
                if (item->type == RCU_CB_DEFAULT)
 
336
                        item->func(item);
 
337
        }
 
338
        
 
339
        /*
 
340
         * If we could not enqueue some callbacks above, retry it now,
 
341
         * this time waiting as long as needed. Note that all the
 
342
         * weight sum has already been computed above.
 
343
         */
 
344
        while (!list_empty(list)) {
 
345
                rcu_t *item =
 
346
                    list_get_instance(list_first(list), rcu_t, link);
 
347
                
 
348
                ASSERT(item);
 
349
                
 
350
                if (item->type == RCU_CB_EXCL)
 
351
                        (void) workq_dispatch(workq, (workq_fn_t) item->func,
 
352
                            item, WORKQ_FLAG_NONE);
 
353
                
 
354
                list_remove(&item->link);
 
355
        }
 
356
        
 
357
        /*
 
358
         * Subtract the weight atomically.
 
359
         */
 
360
        atomic_postsub(&CPU->rcu.weight, weight_sum);
 
361
        
 
362
#if 0
377
363
        CPU->rcu.curlist = RCU_VOLATILE(rcu_t *, CPU->rcu.nextlist);
378
364
        if (CPU->rcu.curlist) {
379
365
                /*
390
376
         * End of the rcu_curlist remains undefined here. There is no
391
377
         * reason to assign it when rcu_curlist is null.
392
378
         */
 
379
#endif
393
380
}
394
381
 
395
382
static void rcu_reclaimer(void *arg)
416
403
        CPU->rcu.gp_ctr = rcu_cb.gp_ctr;
417
404
        
418
405
        while (true) {
419
 
                if (!((CPU->rcu.nextlist) || (CPU->rcu.curlist))) {
 
406
                if ((list_empty(&CPU->rcu.nextlist)) &&
 
407
                    (list_empty(&CPU->rcu.curlist))) {
420
408
                        do {
421
409
                                if (CPU->rcu.sync)
422
410
                                        rcu_handle_sync();
425
413
                                semaphore_down(&CPU->rcu.queue_sema);
426
414
                                mutex_lock(&CPU->rcu.lock);
427
415
                                
428
 
                        } while (!CPU->rcu.nextlist);
 
416
                        } while (list_empty(&CPU->rcu.nextlist));
429
417
                } else {
430
418
                        /*
431
419
                         * So that reading gp_ctr is really safe, not
504
492
#endif /* CONFIG_SMP */
505
493
        
506
494
        CPU->rcu.reclaimer = NULL;
507
 
        CPU->rcu.relay_cpu = NULL;
508
 
        CPU->rcu.nextlist = NULL;
509
 
        CPU->rcu.nexttail = &CPU->rcu.nextlist;
510
 
        CPU->rcu.curlist = NULL;
511
 
        CPU->rcu.curtail = NULL;
 
495
        
 
496
        list_initialize(&CPU->rcu.curlist);
 
497
        list_initialize(&CPU->rcu.nextlist);
512
498
        atomic_set(&CPU->rcu.weight, 0);
 
499
        
513
500
        CPU->rcu.nesting = 0;
514
501
        CPU->rcu.gp_ctr = rcu_cb.gp_ctr;
515
502
        CPU->rcu.sync = NULL;
531
518
        thread_ready(CPU->rcu.reclaimer);
532
519
}
533
520
 
534
 
#if 0
535
 
void rcu_init_callb(void)
536
 
{
537
 
        (void) callb_add(rcu_kadmin_event, &rcu_cb,
538
 
            CB_CL_UADMIN_PRE_VFS, "rcu_cb_vfs");
539
 
}
540
 
#endif
541
 
 
542
521
static inline void rcu_announce_qs(void)
543
522
{
544
523
        ASSERT(PREEMPTION_DISABLED);
566
545
        preemption_enable();
567
546
}
568
547
 
569
 
static inline void rcu_init_arg(rcu_cb_t callb, rcu_t *arg,
570
 
    atomic_count_t weight, rcu_cb_type_t type)
 
548
static inline void rcu_initialize(rcu_t *arg, rcu_cb_type_t type,
 
549
    rcu_cb_t callb, atomic_count_t weight)
571
550
{
572
551
        arg->type = type;
573
552
        arg->func = callb;
574
 
        atomic_set(&arg->weight, weight);
575
 
}
576
 
 
577
 
/*
578
 
 * The following must always be called with
579
 
 *   - preemption disabled (when manipulating current CPU's data).
580
 
 *   - preemption disabled *and* rcu_nesting > 0
581
 
 *     (when manipulating other CPU's data).
582
 
 */
583
 
static inline int rcu_enqueue_callback(cpu_t *cpu, rcu_t *append,
584
 
    rcu_t **append_tail)
585
 
{
586
 
        ASSERT(PREEMPTION_DISABLED);
587
 
        
588
 
        rcu_t **oldtail = atomic_swap_ptr(&cpu->rcu.nexttail,
589
 
            append_tail);
590
 
        
591
 
        /*
592
 
         * This might be a race window. It remains closed when the reclaimer
593
 
         * runs on the same CPU and preemption_disable() is used. Else
594
 
         * preemption_disable() does not close the window.
595
 
         *
596
 
         * Fortunately, preemption_disable() prevents a grace period from ending
597
 
         * too early. Every CPU (including this one) will issue a couple of
598
 
         * memory barriers (in both readers and reclaimers) before the grace
599
 
         * period can end. Particularly, this function will have to have ended
600
 
         * (and its changes reached global visibility) before the enqueued
601
 
         * callbcack can be processed. So the uninitialized gap in the linked
602
 
         * list will have been filled by the time the reclaimer starts
603
 
         * processing the list.
604
 
         */
605
 
        *oldtail = append;
606
 
        return (oldtail == &cpu->rcu.nextlist);
 
553
        arg->weight = weight;
607
554
}
608
555
 
609
556
static void rcu_call_common(rcu_t *arg, atomic_count_t weight)
624
571
        preemption_disable();
625
572
        
626
573
        /*
 
574
         * This might be a race window. It remains closed when the reclaimer
 
575
         * runs on the same CPU and preemption_disable() is used.
 
576
         */
 
577
        list_append(&arg->link, &CPU->rcu.nextlist);
 
578
        
 
579
        /*
627
580
         * The semaphore could be imcremented after preemption_enable() since
628
581
         * we are communicating with a thread on the same CPU. However,
629
582
         * incrementing it here could save some context switches due to
630
583
         * spurious increments.
631
584
         */
632
 
        if (rcu_enqueue_callback(CPU, arg, &arg->next))
633
 
                semaphore_up(&CPU->rcu.queue_sema);
 
585
        semaphore_up(&CPU->rcu.queue_sema);
634
586
        
635
587
        atomic_postadd(&CPU->rcu.weight, weight);
636
588
        rcu_announce_qs();
639
591
 
640
592
void rcu_call(rcu_cb_t callb, rcu_t *arg, atomic_count_t weight)
641
593
{
642
 
        rcu_init_arg(callb, arg, weight, RCU_CB_DEFAULT);
 
594
        rcu_initialize(arg, RCU_CB_DEFAULT, callb, weight);
643
595
        rcu_call_common(arg, weight);
644
596
}
645
597
 
646
598
void rcu_call_excl(rcu_cb_t callb, rcu_t *arg, atomic_count_t weight)
647
599
{
648
 
        rcu_init_arg(callb, arg, weight, RCU_CB_EXCL);
 
600
        rcu_initialize(arg, RCU_CB_EXCL, callb, weight);
649
601
        rcu_call_common(arg, weight);
650
602
}
651
603
 
652
 
static void rcu_call_high_common(rcu_t *arg, atomic_count_t weight)
653
 
{
654
 
        cpu_t *rcp;
655
 
        cpu_t *aux;
656
 
        
657
 
        /*
658
 
         * Either we do all the work before (possible) CPU offlining, or we do
659
 
         * all of it *after* it. In the first case, either the dying
660
 
         * reclaimer thread, or the post mortem CPU event handler will
661
 
         * handle the callback. In the second case, we will correctly
662
 
         * detect that the CPU is offline and relay the callback to another
663
 
         * CPU.
664
 
         */
665
 
        memory_barrier();
666
 
        preemption_disable();
667
 
        for (rcp = CPU; (aux = rcp->rcu.relay_cpu) != NULL; rcp = aux);
668
 
        
669
 
        /*
670
 
         * Nesting prevents nested interrupt handlers from announcing a false
671
 
         * quiescent state in the window between list tail assignment
672
 
         * and list gap filling. We are modifying another CPU's list, so we
673
 
         * must not observe the counter until the gap in the list is filled.
674
 
         * Both assignments must reach global visibility before we allow the
675
 
         * possible interrupt handlers to report quiescent states.
676
 
         *
677
 
         * An infinite series of interrupts between the atomic swap and the
678
 
         * gap assignment could be catastrophic if rcu_nesting was not set
679
 
         * correctly (on all machines). Even interrupts after the assignment and
680
 
         * before the memory barrier could cause problems on weakly ordered
681
 
         * machines.
682
 
         *
683
 
         * Simply put, if this CPU modifies another CPU's nextlist, it must
684
 
         * not announce a quiescent state (context switch or observe the
685
 
         * counter) before the assignment reaches global visibility.
686
 
         */
687
 
        if (CPU == rcp) {
688
 
                if (rcu_enqueue_callback(CPU, arg, &arg->next))
689
 
                        semaphore_up(&CPU->rcu.queue_sema);
690
 
        } else {
691
 
                CPU->rcu.nesting++;
692
 
                
693
 
                if (rcu_enqueue_callback(rcp, arg, &arg->next))
694
 
                        semaphore_up(&rcp->rcu.queue_sema);
695
 
                else
696
 
                        write_barrier();
697
 
                
698
 
                CPU->rcu.nesting--;
699
 
        }
700
 
        
701
 
        atomic_postadd(&rcp->rcu.weight, weight);
702
 
        rcu_announce_qs();
703
 
        preemption_enable();
704
 
}
705
 
 
706
 
/*
707
 
 * In the following two functions, we use the rcu_relay_cpu field to transfer
708
 
 * the task to another CPU in cases when the current CPU is "in motion"
709
 
 * (about to be offlined).
710
 
 *
711
 
 * The key point: Standard threads are always moved away from offlined CPUs, so
712
 
 * they can never enqueue a callback on a CPU where no reclaimer runs.
713
 
 * Interrupts and interrupt threads can run even on offline CPUs in some special
714
 
 * cases. If such an interrupt thread enqueued a callback on its (offlined) CPU,
715
 
 * the callback would certainly leak, since no reclaimer thread would see it.
716
 
 *
717
 
 * This is why why interrupt threads should use the following two functions when
718
 
 * adding callbacks. Unlike their fast-path versions, they make sure that
719
 
 * the callback is always added to a working reclaimer's list.
720
 
 */
721
 
 
722
 
void rcu_call_high(rcu_cb_t callb, rcu_t *arg, atomic_count_t weight)
723
 
{
724
 
        rcu_init_arg(callb, arg, weight, RCU_CB_DEFAULT);
725
 
        rcu_call_high_common(arg, weight);
726
 
}
727
 
 
728
 
void rcu_call_excl_high(rcu_cb_t callb, rcu_t *arg, atomic_count_t weight)
729
 
{
730
 
        rcu_init_arg(callb, arg, weight, RCU_CB_EXCL);
731
 
        rcu_call_high_common(arg, weight);
732
 
}
733
 
 
734
604
void rcu_synchronize(void)
735
605
{
736
606
        /*