ACK/Cmnt: [PATCH 1/1] io-wq: split bounded and unbounded work into separate lists

Stefan Bader stefan.bader at canonical.com
Wed Apr 13 07:48:50 UTC 2022


On 06.04.22 18:19, Timo Aaltonen wrote:
> From: Jens Axboe <axboe at kernel.dk>
> 
> BugLink: https://bugs.launchpad.net/bugs/1952222
> 
> We've got a few issues that all boil down to the fact that we have one
> list of pending work items, yet two different types of workers to
> serve them. This causes some oddities around workers switching type and
> even hashed work vs regular work on the same bounded list.
> 
> Just separate them out cleanly, similarly to how we already do
> accounting of what is running. That provides a clean separation and
> removes some corner cases that can cause stalls when handling IO
> that is punted to io-wq.
> 
> Fixes: ecc53c48c13d ("io-wq: check max_worker limits if a worker transitions bound state")
> Signed-off-by: Jens Axboe <axboe at kernel.dk>
> (backported from commit f95dc207b93da9c88ddbb7741ec3730c6657b88e; minor adjustments)
> Signed-off-by: Timo Aaltonen <timo.aaltonen at canonical.com>
Acked-by: Stefan Bader <stefan.bader at canonical.com>
> ---

Assuming this is more or less tested via oem and can be double checked when it 
lands in 5.13 kernels.

-Stefan

>   fs/io-wq.c | 158 +++++++++++++++++++++++------------------------------
>   1 file changed, 69 insertions(+), 89 deletions(-)
> 
> diff --git a/fs/io-wq.c b/fs/io-wq.c
> index ba7aaf2b95d0..bded284a56d0 100644
> --- a/fs/io-wq.c
> +++ b/fs/io-wq.c
> @@ -34,7 +34,7 @@ enum {
>   };
>   
>   enum {
> -	IO_WQE_FLAG_STALLED	= 1,	/* stalled on hash */
> +	IO_ACCT_STALLED_BIT	= 0,	/* stalled on hash */
>   };
>   
>   /*
> @@ -73,25 +73,24 @@ struct io_wqe_acct {
>   	unsigned max_workers;
>   	int index;
>   	atomic_t nr_running;
> +	struct io_wq_work_list work_list;
> +	unsigned long flags;
>   };
>   
>   enum {
>   	IO_WQ_ACCT_BOUND,
>   	IO_WQ_ACCT_UNBOUND,
> +	IO_WQ_ACCT_NR,
>   };
>   
>   /*
>    * Per-node worker thread pool
>    */
>   struct io_wqe {
> -	struct {
> -		raw_spinlock_t lock;
> -		struct io_wq_work_list work_list;
> -		unsigned flags;
> -	} ____cacheline_aligned_in_smp;
> +	raw_spinlock_t lock;
> +	struct io_wqe_acct acct[2];
>   
>   	int node;
> -	struct io_wqe_acct acct[2];
>   
>   	struct hlist_nulls_head free_list;
>   	struct list_head all_list;
> @@ -196,11 +195,10 @@ static void io_worker_exit(struct io_worker *worker)
>   	do_exit(0);
>   }
>   
> -static inline bool io_wqe_run_queue(struct io_wqe *wqe)
> -	__must_hold(wqe->lock)
> +static inline bool io_acct_run_queue(struct io_wqe_acct *acct)
>   {
> -	if (!wq_list_empty(&wqe->work_list) &&
> -	    !(wqe->flags & IO_WQE_FLAG_STALLED))
> +	if (!wq_list_empty(&acct->work_list) &&
> +	    !test_bit(IO_ACCT_STALLED_BIT, &acct->flags))
>   		return true;
>   	return false;
>   }
> @@ -209,7 +207,8 @@ static inline bool io_wqe_run_queue(struct io_wqe *wqe)
>    * Check head of free list for an available worker. If one isn't available,
>    * caller must create one.
>    */
> -static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
> +static bool io_wqe_activate_free_worker(struct io_wqe *wqe,
> +					struct io_wqe_acct *acct)
>   	__must_hold(RCU)
>   {
>   	struct hlist_nulls_node *n;
> @@ -223,6 +222,10 @@ static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
>   	hlist_nulls_for_each_entry_rcu(worker, n, &wqe->free_list, nulls_node) {
>   		if (!io_worker_get(worker))
>   			continue;
> +		if (io_wqe_get_acct(worker) != acct) {
> +			io_worker_release(worker);
> +			continue;
> +		}
>   		if (wake_up_process(worker->task)) {
>   			io_worker_release(worker);
>   			return true;
> @@ -341,7 +344,7 @@ static void io_wqe_dec_running(struct io_worker *worker)
>   	if (!(worker->flags & IO_WORKER_F_UP))
>   		return;
>   
> -	if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe)) {
> +	if (atomic_dec_and_test(&acct->nr_running) && io_acct_run_queue(acct)) {
>   		atomic_inc(&acct->nr_running);
>   		atomic_inc(&wqe->wq->worker_refs);
>   		io_queue_worker_create(wqe, worker, acct);
> @@ -356,29 +359,10 @@ static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker,
>   			     struct io_wq_work *work)
>   	__must_hold(wqe->lock)
>   {
> -	bool worker_bound, work_bound;
> -
> -	BUILD_BUG_ON((IO_WQ_ACCT_UNBOUND ^ IO_WQ_ACCT_BOUND) != 1);
> -
>   	if (worker->flags & IO_WORKER_F_FREE) {
>   		worker->flags &= ~IO_WORKER_F_FREE;
>   		hlist_nulls_del_init_rcu(&worker->nulls_node);
>   	}
> -
> -	/*
> -	 * If worker is moving from bound to unbound (or vice versa), then
> -	 * ensure we update the running accounting.
> -	 */
> -	worker_bound = (worker->flags & IO_WORKER_F_BOUND) != 0;
> -	work_bound = (work->flags & IO_WQ_WORK_UNBOUND) == 0;
> -	if (worker_bound != work_bound) {
> -		int index = work_bound ? IO_WQ_ACCT_UNBOUND : IO_WQ_ACCT_BOUND;
> -		io_wqe_dec_running(worker);
> -		worker->flags ^= IO_WORKER_F_BOUND;
> -		wqe->acct[index].nr_workers--;
> -		wqe->acct[index ^ 1].nr_workers++;
> -		io_wqe_inc_running(worker);
> -	 }
>   }
>   
>   /*
> @@ -420,44 +404,23 @@ static bool io_wait_on_hash(struct io_wqe *wqe, unsigned int hash)
>   	return ret;
>   }
>   
> -/*
> - * We can always run the work if the worker is currently the same type as
> - * the work (eg both are bound, or both are unbound). If they are not the
> - * same, only allow it if incrementing the worker count would be allowed.
> - */
> -static bool io_worker_can_run_work(struct io_worker *worker,
> -				   struct io_wq_work *work)
> -{
> -	struct io_wqe_acct *acct;
> -
> -	if (!(worker->flags & IO_WORKER_F_BOUND) !=
> -	    !(work->flags & IO_WQ_WORK_UNBOUND))
> -		return true;
> -
> -	/* not the same type, check if we'd go over the limit */
> -	acct = io_work_get_acct(worker->wqe, work);
> -	return acct->nr_workers < acct->max_workers;
> -}
> -
> -static struct io_wq_work *io_get_next_work(struct io_wqe *wqe,
> +static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
>   					   struct io_worker *worker)
>   	__must_hold(wqe->lock)
>   {
>   	struct io_wq_work_node *node, *prev;
>   	struct io_wq_work *work, *tail;
>   	unsigned int stall_hash = -1U;
> +	struct io_wqe *wqe = worker->wqe;
>   
> -	wq_list_for_each(node, prev, &wqe->work_list) {
> +	wq_list_for_each(node, prev, &acct->work_list) {
>   		unsigned int hash;
>   
>   		work = container_of(node, struct io_wq_work, list);
>   
> -		if (!io_worker_can_run_work(worker, work))
> -			break;
> -
>   		/* not hashed, can run anytime */
>   		if (!io_wq_is_hashed(work)) {
> -			wq_list_del(&wqe->work_list, node, prev);
> +			wq_list_del(&acct->work_list, node, prev);
>   			return work;
>   		}
>   
> @@ -468,7 +431,7 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe,
>   		/* hashed, can run if not already running */
>   		if (!test_and_set_bit(hash, &wqe->wq->hash->map)) {
>   			wqe->hash_tail[hash] = NULL;
> -			wq_list_cut(&wqe->work_list, &tail->list, prev);
> +			wq_list_cut(&acct->work_list, &tail->list, prev);
>   			return work;
>   		}
>   		if (stall_hash == -1U)
> @@ -484,12 +447,12 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe,
>   		 * Set this before dropping the lock to avoid racing with new
>   		 * work being added and clearing the stalled bit.
>   		 */
> -		wqe->flags |= IO_WQE_FLAG_STALLED;
> +		set_bit(IO_ACCT_STALLED_BIT, &acct->flags);
>   		raw_spin_unlock(&wqe->lock);
>   		unstalled = io_wait_on_hash(wqe, stall_hash);
>   		raw_spin_lock(&wqe->lock);
>   		if (unstalled) {
> -			wqe->flags &= ~IO_WQE_FLAG_STALLED;
> +			clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
>   			if (wq_has_sleeper(&wqe->wq->hash->wait))
>   				wake_up(&wqe->wq->hash->wait);
>   		}
> @@ -526,6 +489,7 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);
>   static void io_worker_handle_work(struct io_worker *worker)
>   	__releases(wqe->lock)
>   {
> +	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
>   	struct io_wqe *wqe = worker->wqe;
>   	struct io_wq *wq = wqe->wq;
>   	bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state);
> @@ -540,7 +504,7 @@ static void io_worker_handle_work(struct io_worker *worker)
>   		 * can't make progress, any work completion or insertion will
>   		 * clear the stalled flag.
>   		 */
> -		work = io_get_next_work(wqe, worker);
> +		work = io_get_next_work(acct, worker);
>   		if (work)
>   			__io_worker_busy(wqe, worker, work);
>   
> @@ -576,7 +540,7 @@ static void io_worker_handle_work(struct io_worker *worker)
>   				/* serialize hash clear with wake_up() */
>   				spin_lock_irq(&wq->hash->wait.lock);
>   				clear_bit(hash, &wq->hash->map);
> -				wqe->flags &= ~IO_WQE_FLAG_STALLED;
> +				clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
>   				spin_unlock_irq(&wq->hash->wait.lock);
>   				if (wq_has_sleeper(&wq->hash->wait))
>   					wake_up(&wq->hash->wait);
> @@ -595,6 +559,7 @@ static void io_worker_handle_work(struct io_worker *worker)
>   static int io_wqe_worker(void *data)
>   {
>   	struct io_worker *worker = data;
> +	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
>   	struct io_wqe *wqe = worker->wqe;
>   	struct io_wq *wq = wqe->wq;
>   	char buf[TASK_COMM_LEN];
> @@ -610,7 +575,7 @@ static int io_wqe_worker(void *data)
>   		set_current_state(TASK_INTERRUPTIBLE);
>   loop:
>   		raw_spin_lock_irq(&wqe->lock);
> -		if (io_wqe_run_queue(wqe)) {
> +		if (io_acct_run_queue(acct)) {
>   			io_worker_handle_work(worker);
>   			goto loop;
>   		}
> @@ -636,7 +601,7 @@ static int io_wqe_worker(void *data)
>   
>   	if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
>   		raw_spin_lock_irq(&wqe->lock);
> -		if (!wq_list_empty(&wqe->work_list))
> +		if (!wq_list_empty(&acct->work_list))
>   			io_worker_handle_work(worker);
>   		else
>   			raw_spin_unlock_irq(&wqe->lock);
> @@ -782,12 +747,13 @@ static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
>   
>   static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
>   {
> +	struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
>   	unsigned int hash;
>   	struct io_wq_work *tail;
>   
>   	if (!io_wq_is_hashed(work)) {
>   append:
> -		wq_list_add_tail(&work->list, &wqe->work_list);
> +		wq_list_add_tail(&work->list, &acct->work_list);
>   		return;
>   	}
>   
> @@ -797,7 +763,7 @@ static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
>   	if (!tail)
>   		goto append;
>   
> -	wq_list_add_after(&work->list, &tail->list, &wqe->work_list);
> +	wq_list_add_after(&work->list, &tail->list, &acct->work_list);
>   }
>   
>   static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
> @@ -819,10 +785,10 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
>   
>   	raw_spin_lock_irqsave(&wqe->lock, flags);
>   	io_wqe_insert_work(wqe, work);
> -	wqe->flags &= ~IO_WQE_FLAG_STALLED;
> +	clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
>   
>   	rcu_read_lock();
> -	do_create = !io_wqe_activate_free_worker(wqe);
> +	do_create = !io_wqe_activate_free_worker(wqe, acct);
>   	rcu_read_unlock();
>   
>   	raw_spin_unlock_irqrestore(&wqe->lock, flags);
> @@ -875,6 +841,7 @@ static inline void io_wqe_remove_pending(struct io_wqe *wqe,
>   					 struct io_wq_work *work,
>   					 struct io_wq_work_node *prev)
>   {
> +	struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
>   	unsigned int hash = io_get_work_hash(work);
>   	struct io_wq_work *prev_work = NULL;
>   
> @@ -886,7 +853,7 @@ static inline void io_wqe_remove_pending(struct io_wqe *wqe,
>   		else
>   			wqe->hash_tail[hash] = NULL;
>   	}
> -	wq_list_del(&wqe->work_list, &work->list, prev);
> +	wq_list_del(&acct->work_list, &work->list, prev);
>   }
>   
>   static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
> @@ -895,22 +862,27 @@ static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
>   	struct io_wq_work_node *node, *prev;
>   	struct io_wq_work *work;
>   	unsigned long flags;
> +	int i;
>   
>   retry:
>   	raw_spin_lock_irqsave(&wqe->lock, flags);
> -	wq_list_for_each(node, prev, &wqe->work_list) {
> -		work = container_of(node, struct io_wq_work, list);
> -		if (!match->fn(work, match->data))
> -			continue;
> -		io_wqe_remove_pending(wqe, work, prev);
> -		raw_spin_unlock_irqrestore(&wqe->lock, flags);
> -		io_run_cancel(work, wqe);
> -		match->nr_pending++;
> -		if (!match->cancel_all)
> -			return;
> +	for (i = 0; i < IO_WQ_ACCT_NR; i++) {
> +		struct io_wqe_acct *acct = io_get_acct(wqe, i == 0);
>   
> -		/* not safe to continue after unlock */
> -		goto retry;
> +		wq_list_for_each(node, prev, &acct->work_list) {
> +			work = container_of(node, struct io_wq_work, list);
> +			if (!match->fn(work, match->data))
> +				continue;
> +			io_wqe_remove_pending(wqe, work, prev);
> +			raw_spin_unlock_irqrestore(&wqe->lock, flags);
> +			io_run_cancel(work, wqe);
> +			match->nr_pending++;
> +			if (!match->cancel_all)
> +				return;
> +
> +			/* not safe to continue after unlock */
> +			goto retry;
> +		}
>   	}
>   	raw_spin_unlock_irqrestore(&wqe->lock, flags);
>   }
> @@ -971,18 +943,24 @@ static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode,
>   			    int sync, void *key)
>   {
>   	struct io_wqe *wqe = container_of(wait, struct io_wqe, wait);
> +	int i;
>   
>   	list_del_init(&wait->entry);
>   
>   	rcu_read_lock();
> -	io_wqe_activate_free_worker(wqe);
> +	for (i = 0; i < IO_WQ_ACCT_NR; i++) {
> +		struct io_wqe_acct *acct = &wqe->acct[i];
> +
> +		if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags))
> +			io_wqe_activate_free_worker(wqe, acct);
> +	}
>   	rcu_read_unlock();
>   	return 1;
>   }
>   
>   struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
>   {
> -	int ret = -ENOMEM, node;
> +	int ret = -ENOMEM, node, i;
>   	struct io_wq *wq;
>   
>   	if (WARN_ON_ONCE(!data->free_work || !data->do_work))
> @@ -1019,18 +997,20 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
>   			goto err;
>   		wq->wqes[node] = wqe;
>   		wqe->node = alloc_node;
> -		wqe->acct[IO_WQ_ACCT_BOUND].index = IO_WQ_ACCT_BOUND;
> -		wqe->acct[IO_WQ_ACCT_UNBOUND].index = IO_WQ_ACCT_UNBOUND;
>   		wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
> -		atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0);
>   		wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
>   					task_rlimit(current, RLIMIT_NPROC);
> -		atomic_set(&wqe->acct[IO_WQ_ACCT_UNBOUND].nr_running, 0);
> -		wqe->wait.func = io_wqe_hash_wake;
>   		INIT_LIST_HEAD(&wqe->wait.entry);
> +		wqe->wait.func = io_wqe_hash_wake;
> +		for (i = 0; i < IO_WQ_ACCT_NR; i++) {
> +			struct io_wqe_acct *acct = &wqe->acct[i];
> +
> +			acct->index = i;
> +			atomic_set(&acct->nr_running, 0);
> +			INIT_WQ_LIST(&acct->work_list);
> +		}
>   		wqe->wq = wq;
>   		raw_spin_lock_init(&wqe->lock);
> -		INIT_WQ_LIST(&wqe->work_list);
>   		INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0);
>   		INIT_LIST_HEAD(&wqe->all_list);
>   	}

-------------- next part --------------
A non-text attachment was scrubbed...
Name: OpenPGP_signature
Type: application/pgp-signature
Size: 833 bytes
Desc: OpenPGP digital signature
URL: <https://lists.ubuntu.com/archives/kernel-team/attachments/20220413/ca2058fb/attachment-0001.sig>


More information about the kernel-team mailing list