From: Paul E. McKenney Date: Mon, 24 Apr 2017 23:02:09 +0000 (-0700) Subject: srcu: Expedited grace periods with reduced memory contention X-Git-Url: http://git.cdn.openwrt.org/?a=commitdiff_plain;h=1e9a038b7fe9a8c10ef1238f4e695d5fbe0dd594;p=openwrt%2Fstaging%2Fblogic.git srcu: Expedited grace periods with reduced memory contention Commit f60d231a87c5 ("srcu: Crude control of expedited grace periods") introduced a per-srcu_struct atomic counter to track outstanding requests for grace periods. This works, but represents a memory-contention bottleneck. This commit therefore uses the srcu_node combining tree to remove this bottleneck. This commit adds new ->srcu_gp_seq_needed_exp fields to the srcu_data, srcu_node, and srcu_struct structures, which track the farthest-in-the-future grace period that must be expedited, which in turn requires that all nearer-term grace periods also be expedited. Requests for expediting start with the srcu_data structure, run up through the srcu_node tree, and end at the srcu_struct structure. Note that it may be necessary to expedite a grace period that just now started, and this is handled by a new srcu_funnel_exp_start() function, which is invoked when the grace period itself is already in its way, but when that grace period was not marked as expedited. A new srcu_get_delay() function returns zero if there is at least one expedited SRCU grace period in flight, or SRCU_INTERVAL otherwise. This function is used to calculate delays: Normal grace periods are allowed to extend in order to cover more requests with a given grace-period computation, which decreases per-request overhead. Signed-off-by: Paul E. McKenney Tested-by: Mike Galbraith --- diff --git a/include/linux/srcutree.h b/include/linux/srcutree.h index 3865717df124..86df48d3e97b 100644 --- a/include/linux/srcutree.h +++ b/include/linux/srcutree.h @@ -43,6 +43,7 @@ struct srcu_data { spinlock_t lock ____cacheline_internodealigned_in_smp; struct rcu_segcblist srcu_cblist; /* List of callbacks.*/ unsigned long srcu_gp_seq_needed; /* Furthest future GP needed. */ + unsigned long srcu_gp_seq_needed_exp; /* Furthest future exp GP. */ bool srcu_cblist_invoking; /* Invoking these CBs? */ struct delayed_work work; /* Context for CB invoking. */ struct rcu_head srcu_barrier_head; /* For srcu_barrier() use. */ @@ -63,6 +64,7 @@ struct srcu_node { /* is > ->srcu_gq_seq. */ unsigned long srcu_data_have_cbs[4]; /* Which srcu_data structs */ /* have CBs for given GP? */ + unsigned long srcu_gp_seq_needed_exp; /* Furthest future exp GP. */ struct srcu_node *srcu_parent; /* Next up in tree. */ int grplo; /* Least CPU for node. */ int grphi; /* Biggest CPU for node. */ @@ -81,7 +83,7 @@ struct srcu_struct { unsigned int srcu_idx; /* Current rdr array element. */ unsigned long srcu_gp_seq; /* Grace-period seq #. */ unsigned long srcu_gp_seq_needed; /* Latest gp_seq needed. */ - atomic_t srcu_exp_cnt; /* # ongoing expedited GPs. */ + unsigned long srcu_gp_seq_needed_exp; /* Furthest future exp GP. */ struct srcu_data __percpu *sda; /* Per-CPU srcu_data array. */ unsigned long srcu_barrier_seq; /* srcu_barrier seq #. */ struct mutex srcu_barrier_mutex; /* Serialize barrier ops. */ diff --git a/kernel/rcu/srcutree.c b/kernel/rcu/srcutree.c index 72b6cce5f591..4b98e6f45166 100644 --- a/kernel/rcu/srcutree.c +++ b/kernel/rcu/srcutree.c @@ -72,6 +72,7 @@ static void init_srcu_struct_nodes(struct srcu_struct *sp, bool is_static) snp->srcu_have_cbs[i] = 0; snp->srcu_data_have_cbs[i] = 0; } + snp->srcu_gp_seq_needed_exp = 0; snp->grplo = -1; snp->grphi = -1; if (snp == &sp->node[0]) { @@ -102,6 +103,7 @@ static void init_srcu_struct_nodes(struct srcu_struct *sp, bool is_static) rcu_segcblist_init(&sdp->srcu_cblist); sdp->srcu_cblist_invoking = false; sdp->srcu_gp_seq_needed = sp->srcu_gp_seq; + sdp->srcu_gp_seq_needed_exp = sp->srcu_gp_seq; sdp->mynode = &snp_first[cpu / levelspread[level]]; for (snp = sdp->mynode; snp != NULL; snp = snp->srcu_parent) { if (snp->grplo < 0) @@ -135,7 +137,6 @@ static int init_srcu_struct_fields(struct srcu_struct *sp, bool is_static) mutex_init(&sp->srcu_gp_mutex); sp->srcu_idx = 0; sp->srcu_gp_seq = 0; - atomic_set(&sp->srcu_exp_cnt, 0); sp->srcu_barrier_seq = 0; mutex_init(&sp->srcu_barrier_mutex); atomic_set(&sp->srcu_barrier_cpu_cnt, 0); @@ -143,6 +144,7 @@ static int init_srcu_struct_fields(struct srcu_struct *sp, bool is_static) if (!is_static) sp->sda = alloc_percpu(struct srcu_data); init_srcu_struct_nodes(sp, is_static); + sp->srcu_gp_seq_needed_exp = 0; smp_store_release(&sp->srcu_gp_seq_needed, 0); /* Init done. */ return sp->sda ? 0 : -ENOMEM; } @@ -307,6 +309,18 @@ static bool srcu_readers_active(struct srcu_struct *sp) #define SRCU_INTERVAL 1 +/* + * Return grace-period delay, zero if there are expedited grace + * periods pending, SRCU_INTERVAL otherwise. + */ +static unsigned long srcu_get_delay(struct srcu_struct *sp) +{ + if (ULONG_CMP_LT(READ_ONCE(sp->srcu_gp_seq), + READ_ONCE(sp->srcu_gp_seq_needed_exp))) + return 0; + return SRCU_INTERVAL; +} + /** * cleanup_srcu_struct - deconstruct a sleep-RCU structure * @sp: structure to clean up. @@ -318,7 +332,8 @@ void cleanup_srcu_struct(struct srcu_struct *sp) { int cpu; - WARN_ON_ONCE(atomic_read(&sp->srcu_exp_cnt)); + if (WARN_ON(!srcu_get_delay(sp))) + return; /* Leakage unless caller handles error. */ if (WARN_ON(srcu_readers_active(sp))) return; /* Leakage unless caller handles error. */ flush_delayed_work(&sp->work); @@ -444,15 +459,14 @@ static void srcu_schedule_cbs_sdp(struct srcu_data *sdp, unsigned long delay) * schedule this invocation on the corresponding CPUs. */ static void srcu_schedule_cbs_snp(struct srcu_struct *sp, struct srcu_node *snp, - unsigned long mask) + unsigned long mask, unsigned long delay) { int cpu; for (cpu = snp->grplo; cpu <= snp->grphi; cpu++) { if (!(mask & (1 << (cpu - snp->grplo)))) continue; - srcu_schedule_cbs_sdp(per_cpu_ptr(sp->sda, cpu), - atomic_read(&sp->srcu_exp_cnt) ? 0 : SRCU_INTERVAL); + srcu_schedule_cbs_sdp(per_cpu_ptr(sp->sda, cpu), delay); } } @@ -467,6 +481,7 @@ static void srcu_schedule_cbs_snp(struct srcu_struct *sp, struct srcu_node *snp, */ static void srcu_gp_end(struct srcu_struct *sp) { + unsigned long cbdelay; bool cbs; unsigned long gpseq; int idx; @@ -481,8 +496,11 @@ static void srcu_gp_end(struct srcu_struct *sp) spin_lock_irq(&sp->gp_lock); idx = rcu_seq_state(sp->srcu_gp_seq); WARN_ON_ONCE(idx != SRCU_STATE_SCAN2); + cbdelay = srcu_get_delay(sp); rcu_seq_end(&sp->srcu_gp_seq); gpseq = rcu_seq_current(&sp->srcu_gp_seq); + if (ULONG_CMP_LT(sp->srcu_gp_seq_needed_exp, gpseq)) + sp->srcu_gp_seq_needed_exp = gpseq; spin_unlock_irq(&sp->gp_lock); mutex_unlock(&sp->srcu_gp_mutex); /* A new grace period can start at this point. But only one. */ @@ -497,12 +515,14 @@ static void srcu_gp_end(struct srcu_struct *sp) cbs = snp->srcu_have_cbs[idx] == gpseq; snp->srcu_have_cbs[idx] = gpseq; rcu_seq_set_state(&snp->srcu_have_cbs[idx], 1); + if (ULONG_CMP_LT(snp->srcu_gp_seq_needed_exp, gpseq)) + snp->srcu_gp_seq_needed_exp = gpseq; mask = snp->srcu_data_have_cbs[idx]; snp->srcu_data_have_cbs[idx] = 0; spin_unlock_irq(&snp->lock); if (cbs) { smp_mb(); /* GP end before CB invocation. */ - srcu_schedule_cbs_snp(sp, snp, mask); + srcu_schedule_cbs_snp(sp, snp, mask, cbdelay); } } @@ -517,15 +537,43 @@ static void srcu_gp_end(struct srcu_struct *sp) srcu_gp_start(sp); spin_unlock_irq(&sp->gp_lock); /* Throttle expedited grace periods: Should be rare! */ - srcu_reschedule(sp, atomic_read(&sp->srcu_exp_cnt) && - rcu_seq_ctr(gpseq) & 0xf - ? 0 - : SRCU_INTERVAL); + srcu_reschedule(sp, rcu_seq_ctr(gpseq) & 0x3ff + ? 0 : SRCU_INTERVAL); } else { spin_unlock_irq(&sp->gp_lock); } } +/* + * Funnel-locking scheme to scalably mediate many concurrent expedited + * grace-period requests. This function is invoked for the first known + * expedited request for a grace period that has already been requested, + * but without expediting. To start a completely new grace period, + * whether expedited or not, use srcu_funnel_gp_start() instead. + */ +static void srcu_funnel_exp_start(struct srcu_struct *sp, struct srcu_node *snp, + unsigned long s) +{ + unsigned long flags; + + for (; snp != NULL; snp = snp->srcu_parent) { + if (rcu_seq_done(&sp->srcu_gp_seq, s) || + ULONG_CMP_GE(READ_ONCE(snp->srcu_gp_seq_needed_exp), s)) + return; + spin_lock_irqsave(&snp->lock, flags); + if (ULONG_CMP_GE(snp->srcu_gp_seq_needed_exp, s)) { + spin_unlock_irqrestore(&snp->lock, flags); + return; + } + WRITE_ONCE(snp->srcu_gp_seq_needed_exp, s); + spin_unlock_irqrestore(&snp->lock, flags); + } + spin_lock_irqsave(&sp->gp_lock, flags); + if (!ULONG_CMP_LT(sp->srcu_gp_seq_needed_exp, s)) + sp->srcu_gp_seq_needed_exp = s; + spin_unlock_irqrestore(&sp->gp_lock, flags); +} + /* * Funnel-locking scheme to scalably mediate many concurrent grace-period * requests. The winner has to do the work of actually starting grace @@ -533,9 +581,8 @@ static void srcu_gp_end(struct srcu_struct *sp) * number is recorded on at least their leaf srcu_node structure, or they * must take steps to invoke their own callbacks. */ -static void srcu_funnel_gp_start(struct srcu_struct *sp, - struct srcu_data *sdp, - unsigned long s) +static void srcu_funnel_gp_start(struct srcu_struct *sp, struct srcu_data *sdp, + unsigned long s, bool do_norm) { unsigned long flags; int idx = rcu_seq_ctr(s) % ARRAY_SIZE(sdp->mynode->srcu_have_cbs); @@ -554,13 +601,20 @@ static void srcu_funnel_gp_start(struct srcu_struct *sp, spin_unlock_irqrestore(&snp->lock, flags); if (snp == sdp->mynode && snp_seq != s) { smp_mb(); /* CBs after GP! */ - srcu_schedule_cbs_sdp(sdp, 0); + srcu_schedule_cbs_sdp(sdp, do_norm + ? SRCU_INTERVAL + : 0); + return; } + if (!do_norm) + srcu_funnel_exp_start(sp, snp, s); return; } snp->srcu_have_cbs[idx] = s; if (snp == sdp->mynode) snp->srcu_data_have_cbs[idx] |= sdp->grpmask; + if (!do_norm && ULONG_CMP_LT(snp->srcu_gp_seq_needed_exp, s)) + snp->srcu_gp_seq_needed_exp = s; spin_unlock_irqrestore(&snp->lock, flags); } @@ -573,6 +627,8 @@ static void srcu_funnel_gp_start(struct srcu_struct *sp, */ smp_store_release(&sp->srcu_gp_seq_needed, s); /*^^^*/ } + if (!do_norm && ULONG_CMP_LT(sp->srcu_gp_seq_needed_exp, s)) + sp->srcu_gp_seq_needed_exp = s; /* If grace period not already done and none in progress, start it. */ if (!rcu_seq_done(&sp->srcu_gp_seq, s) && @@ -580,9 +636,7 @@ static void srcu_funnel_gp_start(struct srcu_struct *sp, WARN_ON_ONCE(ULONG_CMP_GE(sp->srcu_gp_seq, sp->srcu_gp_seq_needed)); srcu_gp_start(sp); queue_delayed_work(system_power_efficient_wq, &sp->work, - atomic_read(&sp->srcu_exp_cnt) - ? 0 - : SRCU_INTERVAL); + srcu_get_delay(sp)); } spin_unlock_irqrestore(&sp->gp_lock, flags); } @@ -597,7 +651,7 @@ static bool try_check_zero(struct srcu_struct *sp, int idx, int trycount) for (;;) { if (srcu_readers_active_idx_check(sp, idx)) return true; - if (--trycount + !!atomic_read(&sp->srcu_exp_cnt) <= 0) + if (--trycount + !srcu_get_delay(sp) <= 0) return false; udelay(SRCU_RETRY_CHECK_DELAY); } @@ -650,10 +704,11 @@ static void srcu_flip(struct srcu_struct *sp) * srcu_read_lock(), and srcu_read_unlock() that are all passed the same * srcu_struct structure. */ -void call_srcu(struct srcu_struct *sp, struct rcu_head *rhp, - rcu_callback_t func) +void __call_srcu(struct srcu_struct *sp, struct rcu_head *rhp, + rcu_callback_t func, bool do_norm) { unsigned long flags; + bool needexp = false; bool needgp = false; unsigned long s; struct srcu_data *sdp; @@ -672,16 +727,28 @@ void call_srcu(struct srcu_struct *sp, struct rcu_head *rhp, sdp->srcu_gp_seq_needed = s; needgp = true; } + if (!do_norm && ULONG_CMP_LT(sdp->srcu_gp_seq_needed_exp, s)) { + sdp->srcu_gp_seq_needed_exp = s; + needexp = true; + } spin_unlock_irqrestore(&sdp->lock, flags); if (needgp) - srcu_funnel_gp_start(sp, sdp, s); + srcu_funnel_gp_start(sp, sdp, s, do_norm); + else if (needexp) + srcu_funnel_exp_start(sp, sdp->mynode, s); +} + +void call_srcu(struct srcu_struct *sp, struct rcu_head *rhp, + rcu_callback_t func) +{ + __call_srcu(sp, rhp, func, true); } EXPORT_SYMBOL_GPL(call_srcu); /* * Helper function for synchronize_srcu() and synchronize_srcu_expedited(). */ -static void __synchronize_srcu(struct srcu_struct *sp) +static void __synchronize_srcu(struct srcu_struct *sp, bool do_norm) { struct rcu_synchronize rcu; @@ -697,7 +764,7 @@ static void __synchronize_srcu(struct srcu_struct *sp) check_init_srcu_struct(sp); init_completion(&rcu.completion); init_rcu_head_on_stack(&rcu.head); - call_srcu(sp, &rcu.head, wakeme_after_rcu); + __call_srcu(sp, &rcu.head, wakeme_after_rcu, do_norm); wait_for_completion(&rcu.completion); destroy_rcu_head_on_stack(&rcu.head); } @@ -714,18 +781,7 @@ static void __synchronize_srcu(struct srcu_struct *sp) */ void synchronize_srcu_expedited(struct srcu_struct *sp) { - bool do_norm = rcu_gp_is_normal(); - - check_init_srcu_struct(sp); - if (!do_norm) { - atomic_inc(&sp->srcu_exp_cnt); - smp_mb__after_atomic(); /* increment before GP. */ - } - __synchronize_srcu(sp); - if (!do_norm) { - smp_mb__before_atomic(); /* GP before decrement. */ - WARN_ON_ONCE(atomic_dec_return(&sp->srcu_exp_cnt) < 0); - } + __synchronize_srcu(sp, rcu_gp_is_normal()); } EXPORT_SYMBOL_GPL(synchronize_srcu_expedited); @@ -773,7 +829,7 @@ void synchronize_srcu(struct srcu_struct *sp) if (rcu_gp_is_expedited()) synchronize_srcu_expedited(sp); else - __synchronize_srcu(sp); + __synchronize_srcu(sp, true); } EXPORT_SYMBOL_GPL(synchronize_srcu); @@ -1008,14 +1064,13 @@ void process_srcu(struct work_struct *work) sp = container_of(work, struct srcu_struct, work.work); srcu_advance_state(sp); - srcu_reschedule(sp, atomic_read(&sp->srcu_exp_cnt) ? 0 : SRCU_INTERVAL); + srcu_reschedule(sp, srcu_get_delay(sp)); } EXPORT_SYMBOL_GPL(process_srcu); void srcutorture_get_gp_data(enum rcutorture_type test_type, - struct srcu_struct *sp, int *flags, - unsigned long *gpnum, - unsigned long *completed) + struct srcu_struct *sp, int *flags, + unsigned long *gpnum, unsigned long *completed) { if (test_type != SRCU_FLAVOR) return;