ACK/cmnt: [PATCH 1/1] io-wq: split bounded and unbounded work into separate lists
Kleber Souza
kleber.souza at canonical.com
Wed Apr 13 10:13:38 UTC 2022
On 06.04.22 18:19, Timo Aaltonen wrote:
> From: Jens Axboe <axboe at kernel.dk>
>
> BugLink: https://bugs.launchpad.net/bugs/1952222
>
> We've got a few issues that all boil down to the fact that we have one
> list of pending work items, yet two different types of workers to
> serve them. This causes some oddities around workers switching type and
> even hashed work vs regular work on the same bounded list.
>
> Just separate them out cleanly, similarly to how we already do
> accounting of what is running. That provides a clean separation and
> removes some corner cases that can cause stalls when handling IO
> that is punted to io-wq.
>
> Fixes: ecc53c48c13d ("io-wq: check max_worker limits if a worker transitions bound state")
> Signed-off-by: Jens Axboe <axboe at kernel.dk>
> (backported from commit f95dc207b93da9c88ddbb7741ec3730c6657b88e; minor adjustments)
To follow the pattern, this should be added as something like:
(backported from commit f95dc207b93da9c88ddbb7741ec3730c6657b88e)
[ tjaalton: minor adjustments ]
Which can be fixed when applying the patch.
> Signed-off-by: Timo Aaltonen <timo.aaltonen at canonical.com>
Acked-by: Kleber Sacilotto de Souza <kleber.souza at canonical.com>
Thanks
> ---
> fs/io-wq.c | 158 +++++++++++++++++++++++------------------------------
> 1 file changed, 69 insertions(+), 89 deletions(-)
>
> diff --git a/fs/io-wq.c b/fs/io-wq.c
> index ba7aaf2b95d0..bded284a56d0 100644
> --- a/fs/io-wq.c
> +++ b/fs/io-wq.c
> @@ -34,7 +34,7 @@ enum {
> };
>
> enum {
> - IO_WQE_FLAG_STALLED = 1, /* stalled on hash */
> + IO_ACCT_STALLED_BIT = 0, /* stalled on hash */
> };
>
> /*
> @@ -73,25 +73,24 @@ struct io_wqe_acct {
> unsigned max_workers;
> int index;
> atomic_t nr_running;
> + struct io_wq_work_list work_list;
> + unsigned long flags;
> };
>
> enum {
> IO_WQ_ACCT_BOUND,
> IO_WQ_ACCT_UNBOUND,
> + IO_WQ_ACCT_NR,
> };
>
> /*
> * Per-node worker thread pool
> */
> struct io_wqe {
> - struct {
> - raw_spinlock_t lock;
> - struct io_wq_work_list work_list;
> - unsigned flags;
> - } ____cacheline_aligned_in_smp;
> + raw_spinlock_t lock;
> + struct io_wqe_acct acct[2];
>
> int node;
> - struct io_wqe_acct acct[2];
>
> struct hlist_nulls_head free_list;
> struct list_head all_list;
> @@ -196,11 +195,10 @@ static void io_worker_exit(struct io_worker *worker)
> do_exit(0);
> }
>
> -static inline bool io_wqe_run_queue(struct io_wqe *wqe)
> - __must_hold(wqe->lock)
> +static inline bool io_acct_run_queue(struct io_wqe_acct *acct)
> {
> - if (!wq_list_empty(&wqe->work_list) &&
> - !(wqe->flags & IO_WQE_FLAG_STALLED))
> + if (!wq_list_empty(&acct->work_list) &&
> + !test_bit(IO_ACCT_STALLED_BIT, &acct->flags))
> return true;
> return false;
> }
> @@ -209,7 +207,8 @@ static inline bool io_wqe_run_queue(struct io_wqe *wqe)
> * Check head of free list for an available worker. If one isn't available,
> * caller must create one.
> */
> -static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
> +static bool io_wqe_activate_free_worker(struct io_wqe *wqe,
> + struct io_wqe_acct *acct)
> __must_hold(RCU)
> {
> struct hlist_nulls_node *n;
> @@ -223,6 +222,10 @@ static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
> hlist_nulls_for_each_entry_rcu(worker, n, &wqe->free_list, nulls_node) {
> if (!io_worker_get(worker))
> continue;
> + if (io_wqe_get_acct(worker) != acct) {
> + io_worker_release(worker);
> + continue;
> + }
> if (wake_up_process(worker->task)) {
> io_worker_release(worker);
> return true;
> @@ -341,7 +344,7 @@ static void io_wqe_dec_running(struct io_worker *worker)
> if (!(worker->flags & IO_WORKER_F_UP))
> return;
>
> - if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe)) {
> + if (atomic_dec_and_test(&acct->nr_running) && io_acct_run_queue(acct)) {
> atomic_inc(&acct->nr_running);
> atomic_inc(&wqe->wq->worker_refs);
> io_queue_worker_create(wqe, worker, acct);
> @@ -356,29 +359,10 @@ static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker,
> struct io_wq_work *work)
> __must_hold(wqe->lock)
> {
> - bool worker_bound, work_bound;
> -
> - BUILD_BUG_ON((IO_WQ_ACCT_UNBOUND ^ IO_WQ_ACCT_BOUND) != 1);
> -
> if (worker->flags & IO_WORKER_F_FREE) {
> worker->flags &= ~IO_WORKER_F_FREE;
> hlist_nulls_del_init_rcu(&worker->nulls_node);
> }
> -
> - /*
> - * If worker is moving from bound to unbound (or vice versa), then
> - * ensure we update the running accounting.
> - */
> - worker_bound = (worker->flags & IO_WORKER_F_BOUND) != 0;
> - work_bound = (work->flags & IO_WQ_WORK_UNBOUND) == 0;
> - if (worker_bound != work_bound) {
> - int index = work_bound ? IO_WQ_ACCT_UNBOUND : IO_WQ_ACCT_BOUND;
> - io_wqe_dec_running(worker);
> - worker->flags ^= IO_WORKER_F_BOUND;
> - wqe->acct[index].nr_workers--;
> - wqe->acct[index ^ 1].nr_workers++;
> - io_wqe_inc_running(worker);
> - }
> }
>
> /*
> @@ -420,44 +404,23 @@ static bool io_wait_on_hash(struct io_wqe *wqe, unsigned int hash)
> return ret;
> }
>
> -/*
> - * We can always run the work if the worker is currently the same type as
> - * the work (eg both are bound, or both are unbound). If they are not the
> - * same, only allow it if incrementing the worker count would be allowed.
> - */
> -static bool io_worker_can_run_work(struct io_worker *worker,
> - struct io_wq_work *work)
> -{
> - struct io_wqe_acct *acct;
> -
> - if (!(worker->flags & IO_WORKER_F_BOUND) !=
> - !(work->flags & IO_WQ_WORK_UNBOUND))
> - return true;
> -
> - /* not the same type, check if we'd go over the limit */
> - acct = io_work_get_acct(worker->wqe, work);
> - return acct->nr_workers < acct->max_workers;
> -}
> -
> -static struct io_wq_work *io_get_next_work(struct io_wqe *wqe,
> +static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
> struct io_worker *worker)
> __must_hold(wqe->lock)
> {
> struct io_wq_work_node *node, *prev;
> struct io_wq_work *work, *tail;
> unsigned int stall_hash = -1U;
> + struct io_wqe *wqe = worker->wqe;
>
> - wq_list_for_each(node, prev, &wqe->work_list) {
> + wq_list_for_each(node, prev, &acct->work_list) {
> unsigned int hash;
>
> work = container_of(node, struct io_wq_work, list);
>
> - if (!io_worker_can_run_work(worker, work))
> - break;
> -
> /* not hashed, can run anytime */
> if (!io_wq_is_hashed(work)) {
> - wq_list_del(&wqe->work_list, node, prev);
> + wq_list_del(&acct->work_list, node, prev);
> return work;
> }
>
> @@ -468,7 +431,7 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe,
> /* hashed, can run if not already running */
> if (!test_and_set_bit(hash, &wqe->wq->hash->map)) {
> wqe->hash_tail[hash] = NULL;
> - wq_list_cut(&wqe->work_list, &tail->list, prev);
> + wq_list_cut(&acct->work_list, &tail->list, prev);
> return work;
> }
> if (stall_hash == -1U)
> @@ -484,12 +447,12 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe,
> * Set this before dropping the lock to avoid racing with new
> * work being added and clearing the stalled bit.
> */
> - wqe->flags |= IO_WQE_FLAG_STALLED;
> + set_bit(IO_ACCT_STALLED_BIT, &acct->flags);
> raw_spin_unlock(&wqe->lock);
> unstalled = io_wait_on_hash(wqe, stall_hash);
> raw_spin_lock(&wqe->lock);
> if (unstalled) {
> - wqe->flags &= ~IO_WQE_FLAG_STALLED;
> + clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
> if (wq_has_sleeper(&wqe->wq->hash->wait))
> wake_up(&wqe->wq->hash->wait);
> }
> @@ -526,6 +489,7 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);
> static void io_worker_handle_work(struct io_worker *worker)
> __releases(wqe->lock)
> {
> + struct io_wqe_acct *acct = io_wqe_get_acct(worker);
> struct io_wqe *wqe = worker->wqe;
> struct io_wq *wq = wqe->wq;
> bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state);
> @@ -540,7 +504,7 @@ static void io_worker_handle_work(struct io_worker *worker)
> * can't make progress, any work completion or insertion will
> * clear the stalled flag.
> */
> - work = io_get_next_work(wqe, worker);
> + work = io_get_next_work(acct, worker);
> if (work)
> __io_worker_busy(wqe, worker, work);
>
> @@ -576,7 +540,7 @@ static void io_worker_handle_work(struct io_worker *worker)
> /* serialize hash clear with wake_up() */
> spin_lock_irq(&wq->hash->wait.lock);
> clear_bit(hash, &wq->hash->map);
> - wqe->flags &= ~IO_WQE_FLAG_STALLED;
> + clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
> spin_unlock_irq(&wq->hash->wait.lock);
> if (wq_has_sleeper(&wq->hash->wait))
> wake_up(&wq->hash->wait);
> @@ -595,6 +559,7 @@ static void io_worker_handle_work(struct io_worker *worker)
> static int io_wqe_worker(void *data)
> {
> struct io_worker *worker = data;
> + struct io_wqe_acct *acct = io_wqe_get_acct(worker);
> struct io_wqe *wqe = worker->wqe;
> struct io_wq *wq = wqe->wq;
> char buf[TASK_COMM_LEN];
> @@ -610,7 +575,7 @@ static int io_wqe_worker(void *data)
> set_current_state(TASK_INTERRUPTIBLE);
> loop:
> raw_spin_lock_irq(&wqe->lock);
> - if (io_wqe_run_queue(wqe)) {
> + if (io_acct_run_queue(acct)) {
> io_worker_handle_work(worker);
> goto loop;
> }
> @@ -636,7 +601,7 @@ static int io_wqe_worker(void *data)
>
> if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
> raw_spin_lock_irq(&wqe->lock);
> - if (!wq_list_empty(&wqe->work_list))
> + if (!wq_list_empty(&acct->work_list))
> io_worker_handle_work(worker);
> else
> raw_spin_unlock_irq(&wqe->lock);
> @@ -782,12 +747,13 @@ static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
>
> static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
> {
> + struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
> unsigned int hash;
> struct io_wq_work *tail;
>
> if (!io_wq_is_hashed(work)) {
> append:
> - wq_list_add_tail(&work->list, &wqe->work_list);
> + wq_list_add_tail(&work->list, &acct->work_list);
> return;
> }
>
> @@ -797,7 +763,7 @@ static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
> if (!tail)
> goto append;
>
> - wq_list_add_after(&work->list, &tail->list, &wqe->work_list);
> + wq_list_add_after(&work->list, &tail->list, &acct->work_list);
> }
>
> static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
> @@ -819,10 +785,10 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
>
> raw_spin_lock_irqsave(&wqe->lock, flags);
> io_wqe_insert_work(wqe, work);
> - wqe->flags &= ~IO_WQE_FLAG_STALLED;
> + clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
>
> rcu_read_lock();
> - do_create = !io_wqe_activate_free_worker(wqe);
> + do_create = !io_wqe_activate_free_worker(wqe, acct);
> rcu_read_unlock();
>
> raw_spin_unlock_irqrestore(&wqe->lock, flags);
> @@ -875,6 +841,7 @@ static inline void io_wqe_remove_pending(struct io_wqe *wqe,
> struct io_wq_work *work,
> struct io_wq_work_node *prev)
> {
> + struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
> unsigned int hash = io_get_work_hash(work);
> struct io_wq_work *prev_work = NULL;
>
> @@ -886,7 +853,7 @@ static inline void io_wqe_remove_pending(struct io_wqe *wqe,
> else
> wqe->hash_tail[hash] = NULL;
> }
> - wq_list_del(&wqe->work_list, &work->list, prev);
> + wq_list_del(&acct->work_list, &work->list, prev);
> }
>
> static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
> @@ -895,22 +862,27 @@ static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
> struct io_wq_work_node *node, *prev;
> struct io_wq_work *work;
> unsigned long flags;
> + int i;
>
> retry:
> raw_spin_lock_irqsave(&wqe->lock, flags);
> - wq_list_for_each(node, prev, &wqe->work_list) {
> - work = container_of(node, struct io_wq_work, list);
> - if (!match->fn(work, match->data))
> - continue;
> - io_wqe_remove_pending(wqe, work, prev);
> - raw_spin_unlock_irqrestore(&wqe->lock, flags);
> - io_run_cancel(work, wqe);
> - match->nr_pending++;
> - if (!match->cancel_all)
> - return;
> + for (i = 0; i < IO_WQ_ACCT_NR; i++) {
> + struct io_wqe_acct *acct = io_get_acct(wqe, i == 0);
>
> - /* not safe to continue after unlock */
> - goto retry;
> + wq_list_for_each(node, prev, &acct->work_list) {
> + work = container_of(node, struct io_wq_work, list);
> + if (!match->fn(work, match->data))
> + continue;
> + io_wqe_remove_pending(wqe, work, prev);
> + raw_spin_unlock_irqrestore(&wqe->lock, flags);
> + io_run_cancel(work, wqe);
> + match->nr_pending++;
> + if (!match->cancel_all)
> + return;
> +
> + /* not safe to continue after unlock */
> + goto retry;
> + }
> }
> raw_spin_unlock_irqrestore(&wqe->lock, flags);
> }
> @@ -971,18 +943,24 @@ static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode,
> int sync, void *key)
> {
> struct io_wqe *wqe = container_of(wait, struct io_wqe, wait);
> + int i;
>
> list_del_init(&wait->entry);
>
> rcu_read_lock();
> - io_wqe_activate_free_worker(wqe);
> + for (i = 0; i < IO_WQ_ACCT_NR; i++) {
> + struct io_wqe_acct *acct = &wqe->acct[i];
> +
> + if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags))
> + io_wqe_activate_free_worker(wqe, acct);
> + }
> rcu_read_unlock();
> return 1;
> }
>
> struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
> {
> - int ret = -ENOMEM, node;
> + int ret = -ENOMEM, node, i;
> struct io_wq *wq;
>
> if (WARN_ON_ONCE(!data->free_work || !data->do_work))
> @@ -1019,18 +997,20 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
> goto err;
> wq->wqes[node] = wqe;
> wqe->node = alloc_node;
> - wqe->acct[IO_WQ_ACCT_BOUND].index = IO_WQ_ACCT_BOUND;
> - wqe->acct[IO_WQ_ACCT_UNBOUND].index = IO_WQ_ACCT_UNBOUND;
> wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
> - atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0);
> wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
> task_rlimit(current, RLIMIT_NPROC);
> - atomic_set(&wqe->acct[IO_WQ_ACCT_UNBOUND].nr_running, 0);
> - wqe->wait.func = io_wqe_hash_wake;
> INIT_LIST_HEAD(&wqe->wait.entry);
> + wqe->wait.func = io_wqe_hash_wake;
> + for (i = 0; i < IO_WQ_ACCT_NR; i++) {
> + struct io_wqe_acct *acct = &wqe->acct[i];
> +
> + acct->index = i;
> + atomic_set(&acct->nr_running, 0);
> + INIT_WQ_LIST(&acct->work_list);
> + }
> wqe->wq = wq;
> raw_spin_lock_init(&wqe->lock);
> - INIT_WQ_LIST(&wqe->work_list);
> INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0);
> INIT_LIST_HEAD(&wqe->all_list);
> }
More information about the kernel-team
mailing list