diff options
| author | Linus Torvalds <torvalds@linux-foundation.org> | 2025-03-26 17:56:00 -0700 |
|---|---|---|
| committer | Linus Torvalds <torvalds@linux-foundation.org> | 2025-03-26 17:56:00 -0700 |
| commit | 91928e0d3cc29789f4483bffee5f36218f23942b (patch) | |
| tree | f8b8137a158eb654ece809018caeaa1044824411 /io_uring | |
| parent | 1e1ba8d23dae91e6a9cfeb1236b02749e8a49ab3 (diff) | |
| parent | 0f3ebf2d4bc0296c61543b2a729151d89c60e1ec (diff) | |
| download | linux-91928e0d3cc29789f4483bffee5f36218f23942b.tar.gz linux-91928e0d3cc29789f4483bffee5f36218f23942b.tar.bz2 linux-91928e0d3cc29789f4483bffee5f36218f23942b.zip | |
Merge tag 'for-6.15/io_uring-20250322' of git://git.kernel.dk/linux
Pull io_uring updates from Jens Axboe:
"This is the first of the io_uring pull requests for the 6.15 merge
window, there will be others once the net tree has gone in. This
contains:
- Cleanup and unification of cancelation handling across various
request types.
- Improvement for bundles, supporting them both for incrementally
consumed buffers, and for non-multishot requests.
- Enable toggling of using iowait while waiting on io_uring events or
not. Unfortunately this is still tied with CPU frequency boosting
on short waits, as the scheduler side has not been very receptive
to splitting the (useless) iowait stat from the cpufreq implied
boost.
- Add support for kbuf nodes, enabling zero-copy support for the ublk
block driver.
- Various cleanups for resource node handling.
- Series greatly cleaning up the legacy provided (non-ring based)
buffers. For years, we've been pushing the ring provided buffers as
the way to go, and that is what people have been using. Reduce the
complexity and code associated with legacy provided buffers.
- Series cleaning up the compat handling.
- Series improving and cleaning up the recvmsg/sendmsg iovec and msg
handling.
- Series of cleanups for io-wq.
- Start adding a bunch of selftests. The liburing repository
generally carries feature and regression tests for everything, but
at least for ublk initially, we'll try and go the route of having
it in selftests as well. We'll see how this goes, might decide to
migrate more tests this way in the future.
- Various little cleanups and fixes"
* tag 'for-6.15/io_uring-20250322' of git://git.kernel.dk/linux: (108 commits)
selftests: ublk: add stripe target
selftests: ublk: simplify loop io completion
selftests: ublk: enable zero copy for null target
selftests: ublk: prepare for supporting stripe target
selftests: ublk: move common code into common.c
selftests: ublk: increase max buffer size to 1MB
selftests: ublk: add single sqe allocator helper
selftests: ublk: add generic_01 for verifying sequential IO order
selftests: ublk: fix starting ublk device
io_uring: enable toggle of iowait usage when waiting on CQEs
selftests: ublk: fix write cache implementation
selftests: ublk: add variable for user to not show test result
selftests: ublk: don't show `modprobe` failure
selftests: ublk: add one dependency header
io_uring/kbuf: enable bundles for incrementally consumed buffers
Revert "io_uring/rsrc: simplify the bvec iter count calculation"
selftests: ublk: improve test usability
selftests: ublk: add stress test for covering IO vs. killing ublk server
selftests: ublk: add one stress test for covering IO vs. removing device
selftests: ublk: load/unload ublk_drv when preparing & cleaning up tests
...
Diffstat (limited to 'io_uring')
| -rw-r--r-- | io_uring/alloc_cache.h | 6 | ||||
| -rw-r--r-- | io_uring/cancel.c | 42 | ||||
| -rw-r--r-- | io_uring/cancel.h | 8 | ||||
| -rw-r--r-- | io_uring/filetable.c | 2 | ||||
| -rw-r--r-- | io_uring/futex.c | 62 | ||||
| -rw-r--r-- | io_uring/io-wq.c | 230 | ||||
| -rw-r--r-- | io_uring/io-wq.h | 7 | ||||
| -rw-r--r-- | io_uring/io_uring.c | 249 | ||||
| -rw-r--r-- | io_uring/io_uring.h | 15 | ||||
| -rw-r--r-- | io_uring/kbuf.c | 200 | ||||
| -rw-r--r-- | io_uring/kbuf.h | 100 | ||||
| -rw-r--r-- | io_uring/msg_ring.c | 2 | ||||
| -rw-r--r-- | io_uring/net.c | 257 | ||||
| -rw-r--r-- | io_uring/nop.c | 18 | ||||
| -rw-r--r-- | io_uring/notif.c | 4 | ||||
| -rw-r--r-- | io_uring/opdef.c | 4 | ||||
| -rw-r--r-- | io_uring/opdef.h | 12 | ||||
| -rw-r--r-- | io_uring/poll.c | 18 | ||||
| -rw-r--r-- | io_uring/poll.h | 4 | ||||
| -rw-r--r-- | io_uring/rsrc.c | 246 | ||||
| -rw-r--r-- | io_uring/rsrc.h | 24 | ||||
| -rw-r--r-- | io_uring/rw.c | 200 | ||||
| -rw-r--r-- | io_uring/rw.h | 5 | ||||
| -rw-r--r-- | io_uring/splice.c | 3 | ||||
| -rw-r--r-- | io_uring/timeout.c | 16 | ||||
| -rw-r--r-- | io_uring/uring_cmd.c | 33 | ||||
| -rw-r--r-- | io_uring/waitid.c | 56 |
27 files changed, 971 insertions, 852 deletions
diff --git a/io_uring/alloc_cache.h b/io_uring/alloc_cache.h index 0dd17d8ba93a..7f68eff2e7f3 100644 --- a/io_uring/alloc_cache.h +++ b/io_uring/alloc_cache.h @@ -68,4 +68,10 @@ static inline void *io_cache_alloc(struct io_alloc_cache *cache, gfp_t gfp) return io_cache_alloc_new(cache, gfp); } +static inline void io_cache_free(struct io_alloc_cache *cache, void *obj) +{ + if (!io_alloc_cache_put(cache, obj)) + kfree(obj); +} + #endif diff --git a/io_uring/cancel.c b/io_uring/cancel.c index 484193567839..0870060bac7c 100644 --- a/io_uring/cancel.c +++ b/io_uring/cancel.c @@ -341,3 +341,45 @@ out: fput(file); return ret; } + +bool io_cancel_remove_all(struct io_ring_ctx *ctx, struct io_uring_task *tctx, + struct hlist_head *list, bool cancel_all, + bool (*cancel)(struct io_kiocb *)) +{ + struct hlist_node *tmp; + struct io_kiocb *req; + bool found = false; + + lockdep_assert_held(&ctx->uring_lock); + + hlist_for_each_entry_safe(req, tmp, list, hash_node) { + if (!io_match_task_safe(req, tctx, cancel_all)) + continue; + hlist_del_init(&req->hash_node); + if (cancel(req)) + found = true; + } + + return found; +} + +int io_cancel_remove(struct io_ring_ctx *ctx, struct io_cancel_data *cd, + unsigned int issue_flags, struct hlist_head *list, + bool (*cancel)(struct io_kiocb *)) +{ + struct hlist_node *tmp; + struct io_kiocb *req; + int nr = 0; + + io_ring_submit_lock(ctx, issue_flags); + hlist_for_each_entry_safe(req, tmp, list, hash_node) { + if (!io_cancel_req_match(req, cd)) + continue; + if (cancel(req)) + nr++; + if (!(cd->flags & IORING_ASYNC_CANCEL_ALL)) + break; + } + io_ring_submit_unlock(ctx, issue_flags); + return nr ?: -ENOENT; +} diff --git a/io_uring/cancel.h b/io_uring/cancel.h index bbfea2cd00ea..43e9bb74e9d1 100644 --- a/io_uring/cancel.h +++ b/io_uring/cancel.h @@ -24,6 +24,14 @@ int io_try_cancel(struct io_uring_task *tctx, struct io_cancel_data *cd, int io_sync_cancel(struct io_ring_ctx *ctx, void __user *arg); bool io_cancel_req_match(struct io_kiocb *req, struct io_cancel_data *cd); +bool io_cancel_remove_all(struct io_ring_ctx *ctx, struct io_uring_task *tctx, + struct hlist_head *list, bool cancel_all, + bool (*cancel)(struct io_kiocb *)); + +int io_cancel_remove(struct io_ring_ctx *ctx, struct io_cancel_data *cd, + unsigned int issue_flags, struct hlist_head *list, + bool (*cancel)(struct io_kiocb *)); + static inline bool io_cancel_match_sequence(struct io_kiocb *req, int sequence) { if (req->cancel_seq_set && sequence == req->work.cancel_seq) diff --git a/io_uring/filetable.c b/io_uring/filetable.c index dd8eeec97acf..a21660e3145a 100644 --- a/io_uring/filetable.c +++ b/io_uring/filetable.c @@ -68,7 +68,7 @@ static int io_install_fixed_file(struct io_ring_ctx *ctx, struct file *file, if (slot_index >= ctx->file_table.data.nr) return -EINVAL; - node = io_rsrc_node_alloc(IORING_RSRC_FILE); + node = io_rsrc_node_alloc(ctx, IORING_RSRC_FILE); if (!node) return -ENOMEM; diff --git a/io_uring/futex.c b/io_uring/futex.c index 43e2143255f5..0ea4820cd8ff 100644 --- a/io_uring/futex.c +++ b/io_uring/futex.c @@ -44,30 +44,28 @@ void io_futex_cache_free(struct io_ring_ctx *ctx) io_alloc_cache_free(&ctx->futex_cache, kfree); } -static void __io_futex_complete(struct io_kiocb *req, struct io_tw_state *ts) +static void __io_futex_complete(struct io_kiocb *req, io_tw_token_t tw) { req->async_data = NULL; hlist_del_init(&req->hash_node); - io_req_task_complete(req, ts); + io_req_task_complete(req, tw); } -static void io_futex_complete(struct io_kiocb *req, struct io_tw_state *ts) +static void io_futex_complete(struct io_kiocb *req, io_tw_token_t tw) { - struct io_futex_data *ifd = req->async_data; struct io_ring_ctx *ctx = req->ctx; - io_tw_lock(ctx, ts); - if (!io_alloc_cache_put(&ctx->futex_cache, ifd)) - kfree(ifd); - __io_futex_complete(req, ts); + io_tw_lock(ctx, tw); + io_cache_free(&ctx->futex_cache, req->async_data); + __io_futex_complete(req, tw); } -static void io_futexv_complete(struct io_kiocb *req, struct io_tw_state *ts) +static void io_futexv_complete(struct io_kiocb *req, io_tw_token_t tw) { struct io_futex *iof = io_kiocb_to_cmd(req, struct io_futex); struct futex_vector *futexv = req->async_data; - io_tw_lock(req->ctx, ts); + io_tw_lock(req->ctx, tw); if (!iof->futexv_unqueued) { int res; @@ -79,7 +77,7 @@ static void io_futexv_complete(struct io_kiocb *req, struct io_tw_state *ts) kfree(req->async_data); req->flags &= ~REQ_F_ASYNC_DATA; - __io_futex_complete(req, ts); + __io_futex_complete(req, tw); } static bool io_futexv_claim(struct io_futex *iof) @@ -90,7 +88,7 @@ static bool io_futexv_claim(struct io_futex *iof) return true; } -static bool __io_futex_cancel(struct io_ring_ctx *ctx, struct io_kiocb *req) +static bool __io_futex_cancel(struct io_kiocb *req) { /* futex wake already done or in progress */ if (req->opcode == IORING_OP_FUTEX_WAIT) { @@ -116,49 +114,13 @@ static bool __io_futex_cancel(struct io_ring_ctx *ctx, struct io_kiocb *req) int io_futex_cancel(struct io_ring_ctx *ctx, struct io_cancel_data *cd, unsigned int issue_flags) { - struct hlist_node *tmp; - struct io_kiocb *req; - int nr = 0; - - if (cd->flags & (IORING_ASYNC_CANCEL_FD|IORING_ASYNC_CANCEL_FD_FIXED)) - return -ENOENT; - - io_ring_submit_lock(ctx, issue_flags); - hlist_for_each_entry_safe(req, tmp, &ctx->futex_list, hash_node) { - if (req->cqe.user_data != cd->data && - !(cd->flags & IORING_ASYNC_CANCEL_ANY)) - continue; - if (__io_futex_cancel(ctx, req)) - nr++; - if (!(cd->flags & IORING_ASYNC_CANCEL_ALL)) - break; - } - io_ring_submit_unlock(ctx, issue_flags); - - if (nr) - return nr; - - return -ENOENT; + return io_cancel_remove(ctx, cd, issue_flags, &ctx->futex_list, __io_futex_cancel); } bool io_futex_remove_all(struct io_ring_ctx *ctx, struct io_uring_task *tctx, bool cancel_all) { - struct hlist_node *tmp; - struct io_kiocb *req; - bool found = false; - - lockdep_assert_held(&ctx->uring_lock); - - hlist_for_each_entry_safe(req, tmp, &ctx->futex_list, hash_node) { - if (!io_match_task_safe(req, tctx, cancel_all)) - continue; - hlist_del_init(&req->hash_node); - __io_futex_cancel(ctx, req); - found = true; - } - - return found; + return io_cancel_remove_all(ctx, tctx, &ctx->futex_list, cancel_all, __io_futex_cancel); } int io_futex_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) diff --git a/io_uring/io-wq.c b/io_uring/io-wq.c index 91019b4d0308..04a75d666195 100644 --- a/io_uring/io-wq.c +++ b/io_uring/io-wq.c @@ -30,7 +30,6 @@ enum { IO_WORKER_F_UP = 0, /* up and active */ IO_WORKER_F_RUNNING = 1, /* account as running */ IO_WORKER_F_FREE = 2, /* worker on free list */ - IO_WORKER_F_BOUND = 3, /* is doing bounded work */ }; enum { @@ -46,12 +45,12 @@ enum { */ struct io_worker { refcount_t ref; - int create_index; unsigned long flags; struct hlist_nulls_node nulls_node; struct list_head all_list; struct task_struct *task; struct io_wq *wq; + struct io_wq_acct *acct; struct io_wq_work *cur_work; raw_spinlock_t lock; @@ -77,10 +76,27 @@ struct io_worker { #define IO_WQ_NR_HASH_BUCKETS (1u << IO_WQ_HASH_ORDER) struct io_wq_acct { + /** + * Protects access to the worker lists. + */ + raw_spinlock_t workers_lock; + unsigned nr_workers; unsigned max_workers; - int index; atomic_t nr_running; + + /** + * The list of free workers. Protected by #workers_lock + * (write) and RCU (read). + */ + struct hlist_nulls_head free_list; + + /** + * The list of all workers. Protected by #workers_lock + * (write) and RCU (read). + */ + struct list_head all_list; + raw_spinlock_t lock; struct io_wq_work_list work_list; unsigned long flags; @@ -112,12 +128,6 @@ struct io_wq { struct io_wq_acct acct[IO_WQ_ACCT_NR]; - /* lock protects access to elements below */ - raw_spinlock_t lock; - - struct hlist_nulls_head free_list; - struct list_head all_list; - struct wait_queue_entry wait; struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS]; @@ -135,7 +145,7 @@ struct io_cb_cancel_data { bool cancel_all; }; -static bool create_io_worker(struct io_wq *wq, int index); +static bool create_io_worker(struct io_wq *wq, struct io_wq_acct *acct); static void io_wq_dec_running(struct io_worker *worker); static bool io_acct_cancel_pending_work(struct io_wq *wq, struct io_wq_acct *acct, @@ -160,14 +170,14 @@ static inline struct io_wq_acct *io_get_acct(struct io_wq *wq, bool bound) } static inline struct io_wq_acct *io_work_get_acct(struct io_wq *wq, - struct io_wq_work *work) + unsigned int work_flags) { - return io_get_acct(wq, !(atomic_read(&work->flags) & IO_WQ_WORK_UNBOUND)); + return io_get_acct(wq, !(work_flags & IO_WQ_WORK_UNBOUND)); } static inline struct io_wq_acct *io_wq_get_acct(struct io_worker *worker) { - return io_get_acct(worker->wq, test_bit(IO_WORKER_F_BOUND, &worker->flags)); + return worker->acct; } static void io_worker_ref_put(struct io_wq *wq) @@ -192,9 +202,9 @@ static void io_worker_cancel_cb(struct io_worker *worker) struct io_wq *wq = worker->wq; atomic_dec(&acct->nr_running); - raw_spin_lock(&wq->lock); + raw_spin_lock(&acct->workers_lock); acct->nr_workers--; - raw_spin_unlock(&wq->lock); + raw_spin_unlock(&acct->workers_lock); io_worker_ref_put(wq); clear_bit_unlock(0, &worker->create_state); io_worker_release(worker); @@ -213,6 +223,7 @@ static bool io_task_worker_match(struct callback_head *cb, void *data) static void io_worker_exit(struct io_worker *worker) { struct io_wq *wq = worker->wq; + struct io_wq_acct *acct = io_wq_get_acct(worker); while (1) { struct callback_head *cb = task_work_cancel_match(wq->task, @@ -226,11 +237,11 @@ static void io_worker_exit(struct io_worker *worker) io_worker_release(worker); wait_for_completion(&worker->ref_done); - raw_spin_lock(&wq->lock); + raw_spin_lock(&acct->workers_lock); if (test_bit(IO_WORKER_F_FREE, &worker->flags)) hlist_nulls_del_rcu(&worker->nulls_node); list_del_rcu(&worker->all_list); - raw_spin_unlock(&wq->lock); + raw_spin_unlock(&acct->workers_lock); io_wq_dec_running(worker); /* * this worker is a goner, clear ->worker_private to avoid any @@ -269,8 +280,7 @@ static inline bool io_acct_run_queue(struct io_wq_acct *acct) * Check head of free list for an available worker. If one isn't available, * caller must create one. */ -static bool io_wq_activate_free_worker(struct io_wq *wq, - struct io_wq_acct *acct) +static bool io_acct_activate_free_worker(struct io_wq_acct *acct) __must_hold(RCU) { struct hlist_nulls_node *n; @@ -281,13 +291,9 @@ static bool io_wq_activate_free_worker(struct io_wq *wq, * activate. If a given worker is on the free_list but in the process * of exiting, keep trying. */ - hlist_nulls_for_each_entry_rcu(worker, n, &wq->free_list, nulls_node) { + hlist_nulls_for_each_entry_rcu(worker, n, &acct->free_list, nulls_node) { if (!io_worker_get(worker)) continue; - if (io_wq_get_acct(worker) != acct) { - io_worker_release(worker); - continue; - } /* * If the worker is already running, it's either already * starting work or finishing work. In either case, if it does @@ -314,16 +320,16 @@ static bool io_wq_create_worker(struct io_wq *wq, struct io_wq_acct *acct) if (unlikely(!acct->max_workers)) pr_warn_once("io-wq is not configured for unbound workers"); - raw_spin_lock(&wq->lock); + raw_spin_lock(&acct->workers_lock); if (acct->nr_workers >= acct->max_workers) { - raw_spin_unlock(&wq->lock); + raw_spin_unlock(&acct->workers_lock); return true; } acct->nr_workers++; - raw_spin_unlock(&wq->lock); + raw_spin_unlock(&acct->workers_lock); atomic_inc(&acct->nr_running); atomic_inc(&wq->worker_refs); - return create_io_worker(wq, acct->index); + return create_io_worker(wq, acct); } static void io_wq_inc_running(struct io_worker *worker) @@ -343,16 +349,16 @@ static void create_worker_cb(struct callback_head *cb) worker = container_of(cb, struct io_worker, create_work); wq = worker->wq; - acct = &wq->acct[worker->create_index]; - raw_spin_lock(&wq->lock); + acct = worker->acct; + raw_spin_lock(&acct->workers_lock); if (acct->nr_workers < acct->max_workers) { acct->nr_workers++; do_create = true; } - raw_spin_unlock(&wq->lock); + raw_spin_unlock(&acct->workers_lock); if (do_create) { - create_io_worker(wq, worker->create_index); + create_io_worker(wq, acct); } else { atomic_dec(&acct->nr_running); io_worker_ref_put(wq); @@ -384,7 +390,6 @@ static bool io_queue_worker_create(struct io_worker *worker, atomic_inc(&wq->worker_refs); init_task_work(&worker->create_work, func); - worker->create_index = acct->index; if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL)) { /* * EXIT may have been set after checking it above, check after @@ -430,31 +435,36 @@ static void io_wq_dec_running(struct io_worker *worker) * Worker will start processing some work. Move it to the busy list, if * it's currently on the freelist */ -static void __io_worker_busy(struct io_wq *wq, struct io_worker *worker) +static void __io_worker_busy(struct io_wq_acct *acct, struct io_worker *worker) { if (test_bit(IO_WORKER_F_FREE, &worker->flags)) { clear_bit(IO_WORKER_F_FREE, &worker->flags); - raw_spin_lock(&wq->lock); + raw_spin_lock(&acct->workers_lock); hlist_nulls_del_init_rcu(&worker->nulls_node); - raw_spin_unlock(&wq->lock); + raw_spin_unlock(&acct->workers_lock); } } /* * No work, worker going to sleep. Move to freelist. */ -static void __io_worker_idle(struct io_wq *wq, struct io_worker *worker) - __must_hold(wq->lock) +static void __io_worker_idle(struct io_wq_acct *acct, struct io_worker *worker) + __must_hold(acct->workers_lock) { if (!test_bit(IO_WORKER_F_FREE, &worker->flags)) { set_bit(IO_WORKER_F_FREE, &worker->flags); - hlist_nulls_add_head_rcu(&worker->nulls_node, &wq->free_list); + hlist_nulls_add_head_rcu(&worker->nulls_node, &acct->free_list); } } +static inline unsigned int __io_get_work_hash(unsigned int work_flags) +{ + return work_flags >> IO_WQ_HASH_SHIFT; +} + static inline unsigned int io_get_work_hash(struct io_wq_work *work) { - return atomic_read(&work->flags) >> IO_WQ_HASH_SHIFT; + return __io_get_work_hash(atomic_read(&work->flags)); } static bool io_wait_on_hash(struct io_wq *wq, unsigned int hash) @@ -475,26 +485,27 @@ static bool io_wait_on_hash(struct io_wq *wq, unsigned int hash) } static struct io_wq_work *io_get_next_work(struct io_wq_acct *acct, - struct io_worker *worker) + struct io_wq *wq) __must_hold(acct->lock) { struct io_wq_work_node *node, *prev; struct io_wq_work *work, *tail; unsigned int stall_hash = -1U; - struct io_wq *wq = worker->wq; wq_list_for_each(node, prev, &acct->work_list) { + unsigned int work_flags; unsigned int hash; work = container_of(node, struct io_wq_work, list); /* not hashed, can run anytime */ - if (!io_wq_is_hashed(work)) { + work_flags = atomic_read(&work->flags); + if (!__io_wq_is_hashed(work_flags)) { wq_list_del(&acct->work_list, node, prev); return work; } - hash = io_get_work_hash(work); + hash = __io_get_work_hash(work_flags); /* all items with this hash lie in [work, tail] */ tail = wq->hash_tail[hash]; @@ -564,7 +575,7 @@ static void io_worker_handle_work(struct io_wq_acct *acct, * can't make progress, any work completion or insertion will * clear the stalled flag. */ - work = io_get_next_work(acct, worker); + work = io_get_next_work(acct, wq); if (work) { /* * Make sure cancelation can find this, even before @@ -583,7 +594,7 @@ static void io_worker_handle_work(struct io_wq_acct *acct, if (!work) break; - __io_worker_busy(wq, worker); + __io_worker_busy(acct, worker); io_assign_current_work(worker, work); __set_current_state(TASK_RUNNING); @@ -591,12 +602,15 @@ static void io_worker_handle_work(struct io_wq_acct *acct, /* handle a whole dependent link */ do { struct io_wq_work *next_hashed, *linked; - unsigned int hash = io_get_work_hash(work); + unsigned int work_flags = atomic_read(&work->flags); + unsigned int hash = __io_wq_is_hashed(work_flags) + ? __io_get_work_hash(work_flags) + : -1U; next_hashed = wq_next_work(work); if (do_kill && - (atomic_read(&work->flags) & IO_WQ_WORK_UNBOUND)) + (work_flags & IO_WQ_WORK_UNBOUND)) atomic_or(IO_WQ_WORK_CANCEL, &work->flags); wq->do_work(work); io_assign_current_work(worker, NULL); @@ -654,20 +668,20 @@ static int io_wq_worker(void *data) while (io_acct_run_queue(acct)) io_worker_handle_work(acct, worker); - raw_spin_lock(&wq->lock); + raw_spin_lock(&acct->workers_lock); /* * Last sleep timed out. Exit if we're not the last worker, * or if someone modified our affinity. */ if (last_timeout && (exit_mask || acct->nr_workers > 1)) { acct->nr_workers--; - raw_spin_unlock(&wq->lock); + raw_spin_unlock(&acct->workers_lock); __set_current_state(TASK_RUNNING); break; } last_timeout = false; - __io_worker_idle(wq, worker); - raw_spin_unlock(&wq->lock); + __io_worker_idle(acct, worker); + raw_spin_unlock(&acct->workers_lock); if (io_run_task_work()) continue; ret = schedule_timeout(WORKER_IDLE_TIMEOUT); @@ -728,18 +742,18 @@ void io_wq_worker_sleeping(struct task_struct *tsk) io_wq_dec_running(worker); } -static void io_init_new_worker(struct io_wq *wq, struct io_worker *worker, +static void io_init_new_worker(struct io_wq *wq, struct io_wq_acct *acct, struct io_worker *worker, struct task_struct *tsk) { tsk->worker_private = worker; worker->task = tsk; set_cpus_allowed_ptr(tsk, wq->cpu_mask); - raw_spin_lock(&wq->lock); - hlist_nulls_add_head_rcu(&worker->nulls_node, &wq->free_list); - list_add_tail_rcu(&worker->all_list, &wq->all_list); + raw_spin_lock(&acct->workers_lock); + hlist_nulls_add_head_rcu(&worker->nulls_node, &acct->free_list); + list_add_tail_rcu(&worker->all_list, &acct->all_list); set_bit(IO_WORKER_F_FREE, &worker->flags); - raw_spin_unlock(&wq->lock); + raw_spin_unlock(&acct->workers_lock); wake_up_new_task(tsk); } @@ -787,20 +801,20 @@ static void create_worker_cont(struct callback_head *cb) struct io_worker *worker; struct task_struct *tsk; struct io_wq *wq; + struct io_wq_acct *acct; worker = container_of(cb, struct io_worker, create_work); clear_bit_unlock(0, &worker->create_state); wq = worker->wq; + acct = io_wq_get_acct(worker); tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE); if (!IS_ERR(tsk)) { - io_init_new_worker(wq, worker, tsk); + io_init_new_worker(wq, acct, worker, tsk); io_worker_release(worker); return; } else if (!io_should_retry_thread(worker, PTR_ERR(tsk))) { - struct io_wq_acct *acct = io_wq_get_acct(worker); - atomic_dec(&acct->nr_running); - raw_spin_lock(&wq->lock); + raw_spin_lock(&acct->workers_lock); acct->nr_workers--; if (!acct->nr_workers) { struct io_cb_cancel_data match = { @@ -808,11 +822,11 @@ static void create_worker_cont(struct callback_head *cb) .cancel_all = true, }; - raw_spin_unlock(&wq->lock); + raw_spin_unlock(&acct->workers_lock); while (io_acct_cancel_pending_work(wq, acct, &match)) ; } else { - raw_spin_unlock(&wq->lock); + raw_spin_unlock(&acct->workers_lock); } io_worker_ref_put(wq); kfree(worker); @@ -834,9 +848,8 @@ static void io_workqueue_create(struct work_struct *work) kfree(worker); } -static bool create_io_worker(struct io_wq *wq, int index) +static bool create_io_worker(struct io_wq *wq, struct io_wq_acct *acct) { - struct io_wq_acct *acct = &wq->acct[index]; struct io_worker *worker; struct task_struct *tsk; @@ -846,24 +859,22 @@ static bool create_io_worker(struct io_wq *wq, int index) if (!worker) { fail: atomic_dec(&acct->nr_running); - raw_spin_lock(&wq->lock); + raw_spin_lock(&acct->workers_lock); acct->nr_workers--; - raw_spin_unlock(&wq->lock); + raw_spin_unlock(&acct->workers_lock); io_worker_ref_put(wq); return false; } refcount_set(&worker->ref, 1); worker->wq = wq; + worker->acct = acct; raw_spin_lock_init(&worker->lock); init_completion(&worker->ref_done); - if (index == IO_WQ_ACCT_BOUND) - set_bit(IO_WORKER_F_BOUND, &worker->flags); - tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE); if (!IS_ERR(tsk)) { - io_init_new_worker(wq, worker, tsk); + io_init_new_worker(wq, acct, worker, tsk); } else if (!io_should_retry_thread(worker, PTR_ERR(tsk))) { kfree(worker); goto fail; @@ -879,14 +890,14 @@ fail: * Iterate the passed in list and call the specific function for each * worker that isn't exiting */ -static bool io_wq_for_each_worker(struct io_wq *wq, - bool (*func)(struct io_worker *, void *), - void *data) +static bool io_acct_for_each_worker(struct io_wq_acct *acct, + bool (*func)(struct io_worker *, void *), + void *data) { struct io_worker *worker; bool ret = false; - list_for_each_entry_rcu(worker, &wq->all_list, all_list) { + list_for_each_entry_rcu(worker, &acct->all_list, all_list) { if (io_worker_get(worker)) { /* no task if node is/was offline */ if (worker->task) @@ -900,6 +911,18 @@ static bool io_wq_for_each_worker(struct io_wq *wq, return ret; } +static bool io_wq_for_each_worker(struct io_wq *wq, + bool (*func)(struct io_worker *, void *), + void *data) +{ + for (int i = 0; i < IO_WQ_ACCT_NR; i++) { + if (!io_acct_for_each_worker(&wq->acct[i], func, data)) + return false; + } + + return true; +} + static bool io_wq_worker_wake(struct io_worker *worker, void *data) { __set_notify_signal(worker->task); @@ -916,19 +939,19 @@ static void io_run_cancel(struct io_wq_work *work, struct io_wq *wq) } while (work); } -static void io_wq_insert_work(struct io_wq *wq, struct io_wq_work *work) +static void io_wq_insert_work(struct io_wq *wq, struct io_wq_acct *acct, + struct io_wq_work *work, unsigned int work_flags) { - struct io_wq_acct *acct = io_work_get_acct(wq, work); unsigned int hash; struct io_wq_work *tail; - if (!io_wq_is_hashed(work)) { + if (!__io_wq_is_hashed(work_flags)) { append: wq_list_add_tail(&work->list, &acct->work_list); return; } - hash = io_get_work_hash(work); + hash = __io_get_work_hash(work_flags); tail = wq->hash_tail[hash]; wq->hash_tail[hash] = work; if (!tail) @@ -944,8 +967,8 @@ static bool io_wq_work_match_item(struct io_wq_work *work, void *data) void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work) { - struct io_wq_acct *acct = io_work_get_acct(wq, work); unsigned int work_flags = atomic_read(&work->flags); + struct io_wq_acct *acct = io_work_get_acct(wq, work_flags); struct io_cb_cancel_data match = { .fn = io_wq_work_match_item, .data = work, @@ -964,12 +987,12 @@ void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work) } raw_spin_lock(&acct->lock); - io_wq_insert_work(wq, work); + io_wq_insert_work(wq, acct, work, work_flags); clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); raw_spin_unlock(&acct->lock); rcu_read_lock(); - do_create = !io_wq_activate_free_worker(wq, acct); + do_create = !io_acct_activate_free_worker(acct); rcu_read_unlock(); if (do_create && ((work_flags & IO_WQ_WORK_CONCURRENT) || @@ -980,12 +1003,12 @@ void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work) if (likely(did_create)) return; - raw_spin_lock(&wq->lock); + raw_spin_lock(&acct->workers_lock); if (acct->nr_workers) { - raw_spin_unlock(&wq->lock); + raw_spin_unlock(&acct->workers_lock); return; } - raw_spin_unlock(&wq->lock); + raw_spin_unlock(&acct->workers_lock); /* fatal condition, failed to create the first worker */ io_acct_cancel_pending_work(wq, acct, &match); @@ -1034,10 +1057,10 @@ static bool io_wq_worker_cancel(struct io_worker *worker, void *data) } stat |
