273
273
static void rcu_advance_callbacks(void)
275
rcu_t *list = CPU->rcu.curlist;
279
rcu_t *failed_list = NULL;
280
rcu_t **failed_end = &failed_list;
281
atomic_count_t weight_sum = 0;
284
* The following volatile cast makes sure that the workq
285
* variable really exists. If it did not exist, the compiler
286
* could (re-)read it from rcu_cb.workq on each access.
287
* This would break the "enqueued in order" promise, since
288
* callbacks could exist in the old queue and a new one
291
workq_t *workq = RCU_VOLATILE(workq_t *, rcu_cb.workq);
294
* This is where the nextlist from last grace period is
295
* actually terminated.
297
*CPU->rcu.curtail = NULL;
299
/* Grab the 'next' pointer and weight... */
301
weight_sum += atomic_get(&list->weight);
303
switch (list->type) {
309
* This may fail under heavy load and/or memory
310
* pressure. However, we do not want to sleep,
311
* so that all the non-exclusive callbacks are
312
* processed as soon as possible.
314
if (!workq_dispatch(workq, (workq_fn_t) list->func,
315
list, WORKQ_FLAG_NOQUEUE)) {
317
failed_end = &list->next;
324
} while ((list = next) != NULL);
327
* Reaching this place means that nothing failed above.
328
* No need to handle failures.
333
* The following loop executes iff a failure occurred above.
334
* We do not attempt to enqueue the exclusive callbacks after
335
* the first failure, since it could mess up their order.
336
* Instead we just handle the standard callbacks expediently
337
* and sleep waiting for the queue later on, with the correct
338
* order of the exclusive callbacks.
341
for (; list; list = next) {
343
weight_sum += atomic_get(&list->weight);
345
switch (list->type) {
351
failed_end = &list->next;
276
* The following volatile cast makes sure that the workq
277
* variable really exists. If it did not exist, the compiler
278
* could (re-)read it from rcu_cb.workq on each access.
279
* This would break the "enqueued in order" promise, since
280
* callbacks could exist in the old queue and a new one
283
// FIXME is this necessary if no CPU is in motion?
284
workq_t *workq = RCU_VOLATILE(workq_t *, rcu_cb.workq);
286
list_t *list = &CPU->rcu.curlist;
287
atomic_count_t weight_sum = 0;
289
while (!list_empty(list)) {
291
list_get_instance(list_first(list), rcu_t, link);
295
// FIXME is it OK to sum the weight if we can fail?
296
weight_sum += item->weight;
298
switch (item->type) {
304
* This may fail under heavy load and/or memory
305
* pressure. However, we do not want to sleep,
306
* so that all the non-exclusive callbacks are
307
* processed as soon as possible.
309
if (!workq_dispatch(workq, (workq_fn_t) item->func,
310
item, WORKQ_FLAG_NOQUEUE))
357
* If we could not enqueue some callbacks above, retry it now,
358
* this time waiting as long as needed. We know for sure that
359
* failed_list is not null when we get here. Note that all the
360
* weight sum has already been computed above.
365
next = failed_list->next;
366
(void) workq_dispatch(workq, (workq_fn_t) failed_list->func,
368
} while ((failed_list = next) != NULL);
371
* Subtract the weight atomically.
374
atomic_postsub(&CPU->rcu.weight, weight_sum);
316
list_remove(&item->link);
321
* The following loops execute iff a failure occurred above.
322
* We do not attempt to enqueue the exclusive callbacks after
323
* the first failure, since it could mess up their order.
324
* Instead we just handle the standard callbacks expediently
325
* and sleep waiting for the queue later on, with the correct
326
* order of the exclusive callbacks.
328
list_foreach(*list, link) {
329
rcu_t *item = list_get_instance(link, rcu_t, link);
333
weight_sum += item->weight;
335
if (item->type == RCU_CB_DEFAULT)
340
* If we could not enqueue some callbacks above, retry it now,
341
* this time waiting as long as needed. Note that all the
342
* weight sum has already been computed above.
344
while (!list_empty(list)) {
346
list_get_instance(list_first(list), rcu_t, link);
350
if (item->type == RCU_CB_EXCL)
351
(void) workq_dispatch(workq, (workq_fn_t) item->func,
352
item, WORKQ_FLAG_NONE);
354
list_remove(&item->link);
358
* Subtract the weight atomically.
360
atomic_postsub(&CPU->rcu.weight, weight_sum);
377
363
CPU->rcu.curlist = RCU_VOLATILE(rcu_t *, CPU->rcu.nextlist);
378
364
if (CPU->rcu.curlist) {
566
545
preemption_enable();
569
static inline void rcu_init_arg(rcu_cb_t callb, rcu_t *arg,
570
atomic_count_t weight, rcu_cb_type_t type)
548
static inline void rcu_initialize(rcu_t *arg, rcu_cb_type_t type,
549
rcu_cb_t callb, atomic_count_t weight)
572
551
arg->type = type;
573
552
arg->func = callb;
574
atomic_set(&arg->weight, weight);
578
* The following must always be called with
579
* - preemption disabled (when manipulating current CPU's data).
580
* - preemption disabled *and* rcu_nesting > 0
581
* (when manipulating other CPU's data).
583
static inline int rcu_enqueue_callback(cpu_t *cpu, rcu_t *append,
586
ASSERT(PREEMPTION_DISABLED);
588
rcu_t **oldtail = atomic_swap_ptr(&cpu->rcu.nexttail,
592
* This might be a race window. It remains closed when the reclaimer
593
* runs on the same CPU and preemption_disable() is used. Else
594
* preemption_disable() does not close the window.
596
* Fortunately, preemption_disable() prevents a grace period from ending
597
* too early. Every CPU (including this one) will issue a couple of
598
* memory barriers (in both readers and reclaimers) before the grace
599
* period can end. Particularly, this function will have to have ended
600
* (and its changes reached global visibility) before the enqueued
601
* callbcack can be processed. So the uninitialized gap in the linked
602
* list will have been filled by the time the reclaimer starts
603
* processing the list.
606
return (oldtail == &cpu->rcu.nextlist);
553
arg->weight = weight;
609
556
static void rcu_call_common(rcu_t *arg, atomic_count_t weight)
640
592
void rcu_call(rcu_cb_t callb, rcu_t *arg, atomic_count_t weight)
642
rcu_init_arg(callb, arg, weight, RCU_CB_DEFAULT);
594
rcu_initialize(arg, RCU_CB_DEFAULT, callb, weight);
643
595
rcu_call_common(arg, weight);
646
598
void rcu_call_excl(rcu_cb_t callb, rcu_t *arg, atomic_count_t weight)
648
rcu_init_arg(callb, arg, weight, RCU_CB_EXCL);
600
rcu_initialize(arg, RCU_CB_EXCL, callb, weight);
649
601
rcu_call_common(arg, weight);
652
static void rcu_call_high_common(rcu_t *arg, atomic_count_t weight)
658
* Either we do all the work before (possible) CPU offlining, or we do
659
* all of it *after* it. In the first case, either the dying
660
* reclaimer thread, or the post mortem CPU event handler will
661
* handle the callback. In the second case, we will correctly
662
* detect that the CPU is offline and relay the callback to another
666
preemption_disable();
667
for (rcp = CPU; (aux = rcp->rcu.relay_cpu) != NULL; rcp = aux);
670
* Nesting prevents nested interrupt handlers from announcing a false
671
* quiescent state in the window between list tail assignment
672
* and list gap filling. We are modifying another CPU's list, so we
673
* must not observe the counter until the gap in the list is filled.
674
* Both assignments must reach global visibility before we allow the
675
* possible interrupt handlers to report quiescent states.
677
* An infinite series of interrupts between the atomic swap and the
678
* gap assignment could be catastrophic if rcu_nesting was not set
679
* correctly (on all machines). Even interrupts after the assignment and
680
* before the memory barrier could cause problems on weakly ordered
683
* Simply put, if this CPU modifies another CPU's nextlist, it must
684
* not announce a quiescent state (context switch or observe the
685
* counter) before the assignment reaches global visibility.
688
if (rcu_enqueue_callback(CPU, arg, &arg->next))
689
semaphore_up(&CPU->rcu.queue_sema);
693
if (rcu_enqueue_callback(rcp, arg, &arg->next))
694
semaphore_up(&rcp->rcu.queue_sema);
701
atomic_postadd(&rcp->rcu.weight, weight);
707
* In the following two functions, we use the rcu_relay_cpu field to transfer
708
* the task to another CPU in cases when the current CPU is "in motion"
709
* (about to be offlined).
711
* The key point: Standard threads are always moved away from offlined CPUs, so
712
* they can never enqueue a callback on a CPU where no reclaimer runs.
713
* Interrupts and interrupt threads can run even on offline CPUs in some special
714
* cases. If such an interrupt thread enqueued a callback on its (offlined) CPU,
715
* the callback would certainly leak, since no reclaimer thread would see it.
717
* This is why why interrupt threads should use the following two functions when
718
* adding callbacks. Unlike their fast-path versions, they make sure that
719
* the callback is always added to a working reclaimer's list.
722
void rcu_call_high(rcu_cb_t callb, rcu_t *arg, atomic_count_t weight)
724
rcu_init_arg(callb, arg, weight, RCU_CB_DEFAULT);
725
rcu_call_high_common(arg, weight);
728
void rcu_call_excl_high(rcu_cb_t callb, rcu_t *arg, atomic_count_t weight)
730
rcu_init_arg(callb, arg, weight, RCU_CB_EXCL);
731
rcu_call_high_common(arg, weight);
734
604
void rcu_synchronize(void)